是在zK分布式锁的基础上优化,之前是所有线程是对同一个节点进行监视,只要释放锁,就会通知其他所有线程。高并发容易出现资源竞争激烈情况。优化是在一个线程只关心自己的上一个节点锁,释放锁的时候只会通知一个线程取获取锁,这样减少了资源的竞争。
如果想看上一篇博客,可点击。这2篇博客都是写ZK分布式锁的,之间没有依赖关系,这边只是对上一篇的另一种实现,在性能上有了一定的提升,如果想学习一下分布式锁的思想可都看一下,必定会为你有一个很大的提升。
废话不多说了,直接上源码实现:
1、Maven项目的依赖包
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.9</version>
</dependency>
2、分布式锁的接口定义
//分布式锁定义
public interface IDistriLock {
public void Lock();
public void UnLock();
}
3、分布式锁的抽象类定义
package com.hongying.distrilock;
import org.I0Itec.zkclient.ZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ZK分布式锁实现
*
* @author aaa 在永久节点下创建临时顺序节点,判断临时顺序节点是否最小的节点,是:获取锁;
* 否:对上一个节点进行监听,上一个节点删除(释放锁),当前节点则获取锁。
*/
public abstract class ZKDisAbstracLock implements IDistriLock {
Logger log=LoggerFactory.getLogger(ZKDisAbstracLock.class);
String zkServers = "127.0.0.1:2181";
int connectionTimeout = 30000;
int sessionTimeout=30000;
String DistrickPath = "/DistrictLock";// 分布式锁节点
String SubDistrictPath = "/DistrictLock/SubDistrictLock";// 分布式锁子节点锁顺序节点
ZkClient zk = new ZkClient(zkServers, sessionTimeout,connectionTimeout);
Object lockObj=new Object();//创建父节点时的锁对象
String currentThreadPath;//当前线程创建的临时节点Path
public ZKDisAbstracLock() {
// 未创建父分布式节点则创建
synchronized (lockObj) {
if (!zk.exists(DistrickPath)) {
zk.createPersistent(DistrickPath);
}
}
}
@Override
public void Lock() {
if (TryLock()) {
log.info("线程:"+Thread.currentThread().getName()+",获取分布式锁成功..");
} else {
//线程等待释放锁之后重新获取锁
WaitLock();
Lock();
}
}
@Override
public void UnLock() {
//直接关闭ZK连接,ZK并不能立马释放临时节点,为了提高性能,手动删除临时节点
if(currentThreadPath!=null){
zk.delete(currentThreadPath);
}
zk.close();//关闭ZK则临时节点自动释放锁
log.info("线程:"+Thread.currentThread().getName()+",关闭ZK连接");
}
//试着获取锁,异常则获取锁失败
public abstract boolean TryLock();
//线程等待释放锁
public abstract void WaitLock();
}
4、ZK分布式锁的实现
package com.hongying.distrilock;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.I0Itec.zkclient.IZkDataListener;
public class ZKDistriLockImp extends ZKDisAbstracLock {
CountDownLatch latch=null;
@Override
public boolean TryLock() {
try{
if(currentThreadPath==null){
//未创建临时顺序节点则创建
String seqNode=zk.createEphemeralSequential(SubDistrictPath, "1");
log.info("线程:"+Thread.currentThread().getName()+",创建的节点:"+seqNode);
currentThreadPath=seqNode;
}
//判断当前节点名称是否最小节点,是则获取锁,否则对上一个节点添加监听后等待锁释放
List<String> subNodes=zk.getChildren(DistrickPath);
subNodes.sort(new Comparator<String>(){
@Override
public int compare(String o1, String o2) {
if(o1.isEmpty()||o2.isEmpty()){
return -1;
}if(o1.length() > o2.length()){
return 1;
}
if(o1.length() < o2.length()){
return -1;
}
if(o1.compareTo(o2) > 0){
return 1;
}
if(o1.compareTo(o2) < 0){
return -1;
}
if(o1.compareTo(o2) == 0){
return 0;
}
return 0;
}
});
//排序后的节点打印出来
// for(String n:subNodes){
// log.info("线程:"+Thread.currentThread().getName()+",排序后节点:"+n);
// }
//判断当前节点是否最小节点
String prevNode="";//上一个节点
boolean isMinNode=false;
for(int i=0;i<subNodes.size();i++){
if(currentThreadPath.equals(DistrickPath+"/"+subNodes.get(0))){
//第一个就是这个节点表示最小,直接返回
isMinNode=true;
break;
}else{
//不是最小节点找到上一个节点,作为监听的节点
if(currentThreadPath.equals(DistrickPath+"/"+subNodes.get(i))){
prevNode=DistrickPath+"/"+subNodes.get(i-1);
}
}
}
if(isMinNode){
log.info("线程:"+Thread.currentThread().getName()+",成功获取锁..");
return true;
}
else{
//注意:这里判断节点是否存在非常重要,在并发下,当获取上一个要监视的节点,如果在添加监视的同时,删除了这个节点。
//这个节点本身删除了,这个通知永远不会收到通知,导致后续对这个节点的监视永远不会触发锁释放
if(!zk.exists(prevNode)){
//当前节点不存在了则重新获取要监视的节点
return TryLock();
}
latch=new CountDownLatch(1);//如果当前线程未获取锁,则需要休眠
log.info("线程:"+Thread.currentThread().getName()+",对节点:"+prevNode+",进行监视");
//对当前的上一个节点进行监听
zk.subscribeDataChanges(prevNode, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
log.info("线程:"+Thread.currentThread().getName()+",节点已释放,下个节点可获取锁...");
latch.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
int i=10;
}
});
return false;
}
}
catch(Exception e){
e.printStackTrace();
log.info("试着获取锁异常:",e.getMessage());
//如果发生异常重新试着获取锁
return TryLock();
}
}
@Override
public void WaitLock() {
try {
log.info("线程:"+Thread.currentThread().getName()+ ",已经休眠...");
if(latch!=null){
latch.await();//线程等待,直到监视的节点删除了,释放锁时候
}
log.info("线程:"+Thread.currentThread().getName()+",被唤醒..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5、分布式锁的测试案例
package com.hongying.distrilock;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OrderServiceTest implements Runnable {
private static Logger logger = LoggerFactory.getLogger(OrderServiceTest.class);
private static int count = 700;//并发线程数量。缺点:线程过多或业务执行时间长,连接ZK Session超时。
private static CountDownLatch cdl = new CountDownLatch(count);
IDistriLock lock = new ZKDistriLockImp();
public void run() {
//创建订单
createOrderNum();
}
public void createOrderNum() {
lock.Lock();
String orderNum = OrderFactory.GetOrder();
logger.info(Thread.currentThread().getName() + "创建了订单号:【" + orderNum
+ "】!");
try {
Thread.sleep(15);//时间长会导致大部分线程都休眠,但是都会连接着ZK,导致连接Session过长失去ZK连接,爆出异常
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.UnLock();
}
public static void main(String[] args) {
for (int i = 0; i < count; i++) {
new Thread(new OrderServiceTest()).start();
//发令枪里面的数字减一
cdl.countDown();
}
}
}
6、总结
以上是自己对ZK分布式锁的另外一种优化,个人采用以上的简单并发测试没有任何问题。但是从代码的逻辑来看当并发量上1000后,业务逻辑复杂,每个线程对锁的占用时间增长,ZK的连接数量急剧上升,会占用大量的ZK连接,会出现ZK连接不上等异常信息出现,这个就需要进一步优化对ZK的连接了。
小弟不才,以上只是自己对ZK的理解并实现。如有任何分歧或问题都可以留言,进一步学习提升。