博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用zookeeper作为分布式锁以及设计一种通知监听模式
阅读量:5165 次
发布时间:2019-06-13

本文共 4432 字,大约阅读时间需要 14 分钟。

1、创建实例 /**  * 初始化单例的便捷方法  */
public static void init() {   getInstance();}

  

/**  * 获取单例  * @return  */
public static ZooKeeperSession getInstance() {   return Singleton.getInstance();}

  

/**  * 封装单例的静态内部类  * @author Administrator  *  */
private static class Singleton {      private static ZooKeeperSession instance;      static {      instance = new ZooKeeperSession();   }      public static ZooKeeperSession getInstance() {      return instance;   }   }

 

zookeeper server,创建会话的时候,是异步去进行的,所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接。
public ZooKeeperSession() {		// 去连接zookeeper server,创建会话的时候,是异步去进行的		// 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接		try {			this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher());			// 给一个状态CONNECTING,连接中			System.out.println(zookeeper.getState());						try {				// CountDownLatch				// java多线程并发同步的一个工具类				// 会传递进去一些数字,比如说1,2 ,3 都可以				// 然后await(),如果数字不是0,那么久卡住,等待								// 其他的线程可以调用coutnDown(),减1				// 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态				// 继续向下运行								connectedSemaphore.await();			} catch(InterruptedException e) {				e.printStackTrace();			}			System.out.println("ZooKeeper session established......");		} catch (Exception e) {			e.printStackTrace();		}	}

  

 

/**  * 建立zk session的watcher  * @author Administrator  *  */
private class ZooKeeperWatcher implements Watcher {   public void process(WatchedEvent event) {      System.out.println("Receive watched event: " + event.getState());      if(KeeperState.SyncConnected == event.getState()) {         connectedSemaphore.countDown();      }    }   }

  

********************************************************************************************************** 

/**  * 获取分布式锁  * @param productId  */
public void acquireDistributedLock(Long productId) {		String path = "/product-lock-" + productId;			try {			zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);			System.out.println("success to acquire lock for product[id=" + productId + "]");  		} catch (Exception e) {			// 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错			// NodeExistsException			int count = 0;			while(true) {				try {					Thread.sleep(1000); 					zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);				} catch (Exception e2) {					count++;					System.out.println("the " + count + " times try to acquire lock for product[id=" + productId + "]......");					continue;				}				System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");				break;			}		}	}

 

/**  * 释放掉一个分布式锁  * @param productId  */
public void releaseDistributedLock(Long productId) {		String path = "/product-lock-" + productId;		try {			zookeeper.delete(path, -1); 			System.out.println("release the lock for product[id=" + productId + "]......");  		} catch (Exception e) {			e.printStackTrace();		}	}

**********************************************************************************************************

2、实例应用

          // 加代码,在将数据直接写入redis缓存之前,应该先获取一个zk的分布式锁		ZooKeeperSession zkSession = ZooKeeperSession.getInstance();		zkSession.acquireDistributedLock(productId);  				// 获取到了锁		// 先从redis中获取数据		ProductInfo existedProductInfo = cacheService.getProductInfoFromReidsCache(productId);				if(existedProductInfo != null) {			// 比较当前数据的时间版本比已有数据的时间版本是新还是旧			try {				Date date = sdf.parse(productInfo.getModifiedTime());				Date existedDate = sdf.parse(existedProductInfo.getModifiedTime());								if(date.before(existedDate)) {					System.out.println("current date[" + productInfo.getModifiedTime() + "] is before existed date[" + existedProductInfo.getModifiedTime() + "]");					return;				}			} catch (Exception e) {				e.printStackTrace();			}			System.out.println("current date[" + productInfo.getModifiedTime() + "] is after existed date[" + existedProductInfo.getModifiedTime() + "]");		} else {			System.out.println("existed product info is null......");   		}				try {			Thread.sleep(10 * 1000);		} catch (InterruptedException e) {			e.printStackTrace();		}				cacheService.saveProductInfo2LocalCache(productInfo);		System.out.println("===================获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId));  		cacheService.saveProductInfo2ReidsCache(productInfo);  				// 释放分布式锁		zkSession.releaseDistributedLock(productId);

  

 

转载于:https://www.cnblogs.com/gxyandwmm/p/11437427.html

你可能感兴趣的文章
NSPredicate的使用,超级强大
查看>>
自动分割mp3等音频视频文件的脚本
查看>>
判断字符串是否为空的注意事项
查看>>
布兰诗歌
查看>>
js编码
查看>>
Pycharm Error loading package list:Status: 403错误解决方法
查看>>
steps/train_sat.sh
查看>>
转:Linux设备树(Device Tree)机制
查看>>
iOS 组件化
查看>>
(转)Tomcat 8 安装和配置、优化
查看>>
(转)Linxu磁盘体系知识介绍及磁盘介绍
查看>>
tkinter布局
查看>>
命令ord
查看>>
Sharepoint 2013搜索服务配置总结(实战)
查看>>
博客盈利请先考虑这七点
查看>>
使用 XMLBeans 进行编程
查看>>
写接口请求类型为get或post的时,参数定义的几种方式,如何用注解(原创)--雷锋...
查看>>
【OpenJ_Bailian - 2287】Tian Ji -- The Horse Racing (贪心)
查看>>
Java网络编程--socket服务器端与客户端讲解
查看>>
List_统计输入数值的各种值
查看>>