1、创建实例 /** * 初始化单例的便捷方法 */
public static void init() { getInstance();}
/** * 获取单例 * @return */
public static ZooKeeperSession getInstance() { return Singleton.getInstance();}
/** * 封装单例的静态内部类 * @author Administrator * */
private static class Singleton { private static ZooKeeperSession instance; static { instance = new ZooKeeperSession(); } public static ZooKeeperSession getInstance() { return instance; } }
zookeeper server,创建会话的时候,是异步去进行的,所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接。
public ZooKeeperSession() { // 去连接zookeeper server,创建会话的时候,是异步去进行的 // 所以要给一个监听器,说告诉我们什么时候才是真正完成了跟zk server的连接 try { this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher()); // 给一个状态CONNECTING,连接中 System.out.println(zookeeper.getState()); try { // CountDownLatch // java多线程并发同步的一个工具类 // 会传递进去一些数字,比如说1,2 ,3 都可以 // 然后await(),如果数字不是0,那么久卡住,等待 // 其他的线程可以调用coutnDown(),减1 // 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态 // 继续向下运行 connectedSemaphore.await(); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("ZooKeeper session established......"); } catch (Exception e) { e.printStackTrace(); } }
/** * 建立zk session的watcher * @author Administrator * */
private class ZooKeeperWatcher implements Watcher { public void process(WatchedEvent event) { System.out.println("Receive watched event: " + event.getState()); if(KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } }
**********************************************************************************************************
/** * 获取分布式锁 * @param productId */
public void acquireDistributedLock(Long productId) { String path = "/product-lock-" + productId; try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for product[id=" + productId + "]"); } catch (Exception e) { // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错 // NodeExistsException int count = 0; while(true) { try { Thread.sleep(1000); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { count++; System.out.println("the " + count + " times try to acquire lock for product[id=" + productId + "]......"); continue; } System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......"); break; } } }
/** * 释放掉一个分布式锁 * @param productId */
public void releaseDistributedLock(Long productId) { String path = "/product-lock-" + productId; try { zookeeper.delete(path, -1); System.out.println("release the lock for product[id=" + productId + "]......"); } catch (Exception e) { e.printStackTrace(); } }
**********************************************************************************************************
2、实例应用
// 加代码,在将数据直接写入redis缓存之前,应该先获取一个zk的分布式锁 ZooKeeperSession zkSession = ZooKeeperSession.getInstance(); zkSession.acquireDistributedLock(productId); // 获取到了锁 // 先从redis中获取数据 ProductInfo existedProductInfo = cacheService.getProductInfoFromReidsCache(productId); if(existedProductInfo != null) { // 比较当前数据的时间版本比已有数据的时间版本是新还是旧 try { Date date = sdf.parse(productInfo.getModifiedTime()); Date existedDate = sdf.parse(existedProductInfo.getModifiedTime()); if(date.before(existedDate)) { System.out.println("current date[" + productInfo.getModifiedTime() + "] is before existed date[" + existedProductInfo.getModifiedTime() + "]"); return; } } catch (Exception e) { e.printStackTrace(); } System.out.println("current date[" + productInfo.getModifiedTime() + "] is after existed date[" + existedProductInfo.getModifiedTime() + "]"); } else { System.out.println("existed product info is null......"); } try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cacheService.saveProductInfo2LocalCache(productInfo); System.out.println("===================获取刚保存到本地缓存的商品信息:" + cacheService.getProductInfoFromLocalCache(productId)); cacheService.saveProductInfo2ReidsCache(productInfo); // 释放分布式锁 zkSession.releaseDistributedLock(productId);