博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
非阻塞同步算法实战(二)-BoundlessCyclicBarrier
阅读量:6705 次
发布时间:2019-06-25

本文共 6496 字,大约阅读时间需要 21 分钟。

感谢网友trytocatch的投稿

前言

相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。

需求介绍

我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。

该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。

如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。

因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。

问题分析

简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。

至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。

解决方案

以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。

因为较为复杂,下面先给出完整的代码,再讲解其中的关键。

注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。

001 /**
002  * @author trytocatch@163.com
003  * @time 2013-1-31
004  */
005 public class BoundlessCyclicBarrier {
006     protected final AtomicReference<VersionQueue> waitQueueRef;
007  
008     public BoundlessCyclicBarrier() {
009         this(0);
010     }
011  
012     public BoundlessCyclicBarrier(int startVersion) {
013         waitQueueRef = new AtomicReference<VersionQueue>(new VersionQueue(startVersion));
014     }
015  
016     public final void awaitWithAssignedVersion(int myVersion)
017             throws InterruptedException {
018         awaitImpl(true, myVersion, 0);
019     }
020  
021     /**
022      *
023      * @param myVersion
024      * @param nanosTimeout
025      * @return if timeout, or be canceled and doesn't reach myVersion, returns false
026      * @throws InterruptedException
027      */
028     public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
029         return awaitImpl(true, myVersion, nanosTimeout);
030     }
031  
032     public final void await() throws InterruptedException {
033         awaitImpl(false, 0, 0);
034     }
035  
036     /**
037      *
038      * @param nanosTimeout
039      * @return if and only if timeout, returns false
040      * @throws InterruptedException
041      */
042     public final boolean await(long nanosTimeout)
043             throws InterruptedException {
044         return awaitImpl(false, 0, nanosTimeout);
045     }
046  
047     /**
048      * pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)
049      * @return old queue version
050      */
051     public int nextCycle() {
052         VersionQueue oldQueue = waitQueueRef.get();
053         VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
054         for(;;){
055             if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
056                 for (Thread t : oldQueue.queue)
057                     LockSupport.unpark(t);
058                 break;
059             }
060             oldQueue = waitQueueRef.get();
061             newQueue.version = oldQueue.version + 1;
062         }
063         return oldQueue.version;
064     }
065  
066     /**
067      * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
068      * @param newAssignVersion
069      */
070     public void nextCycle(int newAssignVersion) {
071         VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
072         for (Thread t : oldQueue.queue)
073             LockSupport.unpark(t);
074     }
075  
076     /**
077      * if version update has stopped, invoke this to awake all threads
078      */
079     public void cancel(){
080         VersionQueue oldQueue = waitQueueRef.get();
081         if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
082             for (Thread t : oldQueue.queue)
083                 LockSupport.unpark(t);
084     }
085  
086     public final int getVersion() {
087         return waitQueueRef.get().version;
088     }
089  
090     private static final class VersionQueue {
091         final private ConcurrentLinkedQueue queue;
092         int version;
093         final boolean isCancelQueue;
094  
095         VersionQueue(int curVersion){
096             this(curVersion, false);
097         }
098  
099         VersionQueue(int curVersion, boolean isCancelQueue) {
100             this.version = curVersion;
101             this.isCancelQueue = isCancelQueue;
102             queue = new ConcurrentLinkedQueue();
103         }
104     }
105  
106     /**
107      *
108      * @param assignVersion is myVersion available
109      * @param myVersion wait for this version
110      * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn't reach myVersion, returns false      * @throws InterruptedException      */     protected boolean awaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout) throws InterruptedException {         boolean timeOutEnable = nanosTimeout > 0;
111         long lastTime = System.nanoTime();
112         VersionQueue newQueue = waitQueueRef.get();//A
113         if (assignVersion && newQueue.version - myVersion >= 0)
114             return true;
115         while (true) {
116             VersionQueue submitQueue = newQueue;//B
117             submitQueue.queue.add(Thread.currentThread());//C
118             while (true) {
119                 newQueue = waitQueueRef.get();//D
120                 if (newQueue != submitQueue){
//E: it's a new cycle
121                     if(assignVersion == false)
122                         return true;
123                     else if(newQueue.version - myVersion >= 0)
124                         return true;
125                     else if (newQueue.isCancelQueue)//F: be canceled
126                         return false;
127                     else//just like invoking awaitImpl again
128                         break;
129                 }
130                 if (timeOutEnable) {
131                     if (nanosTimeout <= 0)
132                         return false;
133                     LockSupport.parkNanos(this, nanosTimeout);
134                     long now = System.nanoTime();
135                     nanosTimeout -= now - lastTime;
136                     lastTime = now;
137                 } else
138                     LockSupport.park(this);
139                 if (Thread.interrupted())
140                     throw new InterruptedException();
141             }
142         }
143     }
144 }

代码分析

先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。

再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。

如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。

这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。

代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。

如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。

小结

本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。

转载地址:http://hfdlo.baihongyu.com/

你可能感兴趣的文章
Node.js 8.9 LTS版本发布
查看>>
CodeOne 主题演讲:Java,未来已来
查看>>
一文带你快速读懂.NET CLI
查看>>
Alluxio在多级分布式缓存系统中的应用
查看>>
百度云BaaS体系揭秘,突破共识机制、单机计算和串行处理三大瓶颈
查看>>
Mozilla公布WebVR API标准草案
查看>>
到底什么才是业务架构?
查看>>
基础设施即代码:Terraform和AWS无服务器
查看>>
敏捷的忠实拥护者David Hussman于8月18日去世
查看>>
新书问答:Software Wasteland
查看>>
Atlassian发布事故管理解决方案Jira Ops
查看>>
书评 —— 《Go语言编程》
查看>>
Apache HBase的现状和发展
查看>>
反模式的经典 - Mockito设计解析
查看>>
Zip Slip目录遍历漏洞已影响多个Java项目
查看>>
独家揭秘:微博深度学习平台如何支撑4亿用户愉快吃瓜?
查看>>
Visual Studio 15.7预览版4改进Git、C++支持
查看>>
全新云服务:亚马逊AWS发布AWS Ground Station\n
查看>>
微软宣布支持基于虚拟机的Azure IOT Edge服务
查看>>
来自社区的Visual Studio Code使用体验和教程
查看>>