本文共 6496 字,大约阅读时间需要 21 分钟。
感谢网友trytocatch的投稿
相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。
我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。
该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。
如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。
因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。
简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。
至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。
以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。
因为较为复杂,下面先给出完整的代码,再讲解其中的关键。
注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。
001 | /** |
002 | * @author trytocatch@163.com |
003 | * @time 2013-1-31 |
004 | */ |
005 | public class BoundlessCyclicBarrier { |
006 | protected final AtomicReference<VersionQueue> waitQueueRef; |
007 |
008 | public BoundlessCyclicBarrier() { |
009 | this ( 0 ); |
010 | } |
011 |
012 | public BoundlessCyclicBarrier( int startVersion) { |
013 | waitQueueRef = new AtomicReference<VersionQueue>( new VersionQueue(startVersion)); |
014 | } |
015 |
016 | public final void awaitWithAssignedVersion( int myVersion) |
017 | throws InterruptedException { |
018 | awaitImpl( true , myVersion, 0 ); |
019 | } |
020 |
021 | /** |
022 | * |
023 | * @param myVersion |
024 | * @param nanosTimeout |
025 | * @return if timeout, or be canceled and doesn't reach myVersion, returns false |
026 | * @throws InterruptedException |
027 | */ |
028 | public final boolean awaitWithAssignedVersion( int myVersion, long nanosTimeout) throws InterruptedException { |
029 | return awaitImpl( true , myVersion, nanosTimeout); |
030 | } |
031 |
032 | public final void await() throws InterruptedException { |
033 | awaitImpl( false , 0 , 0 ); |
034 | } |
035 |
036 | /** |
037 | * |
038 | * @param nanosTimeout |
039 | * @return if and only if timeout, returns false |
040 | * @throws InterruptedException |
041 | */ |
042 | public final boolean await( long nanosTimeout) |
043 | throws InterruptedException { |
044 | return awaitImpl( false , 0 , nanosTimeout); |
045 | } |
046 |
047 | /** |
048 | * pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier) |
049 | * @return old queue version |
050 | */ |
051 | public int nextCycle() { |
052 | VersionQueue oldQueue = waitQueueRef.get(); |
053 | VersionQueue newQueue = new VersionQueue(oldQueue.version + 1 ); |
054 | for (;;){ |
055 | if (waitQueueRef.compareAndSet(oldQueue, newQueue)) { |
056 | for (Thread t : oldQueue.queue) |
057 | LockSupport.unpark(t); |
058 | break ; |
059 | } |
060 | oldQueue = waitQueueRef.get(); |
061 | newQueue.version = oldQueue.version + 1 ; |
062 | } |
063 | return oldQueue.version; |
064 | } |
065 |
066 | /** |
067 | * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right) |
068 | * @param newAssignVersion |
069 | */ |
070 | public void nextCycle( int newAssignVersion) { |
071 | VersionQueue oldQueue = waitQueueRef.getAndSet( new VersionQueue(newAssignVersion)); |
072 | for (Thread t : oldQueue.queue) |
073 | LockSupport.unpark(t); |
074 | } |
075 |
076 | /** |
077 | * if version update has stopped, invoke this to awake all threads |
078 | */ |
079 | public void cancel(){ |
080 | VersionQueue oldQueue = waitQueueRef.get(); |
081 | if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true ))) { |
082 | for (Thread t : oldQueue.queue) |
083 | LockSupport.unpark(t); |
084 | } |
085 |
086 | public final int getVersion() { |
087 | return waitQueueRef.get().version; |
088 | } |
089 |
090 | private static final class VersionQueue { |
091 | final private ConcurrentLinkedQueue queue; |
092 | int version; |
093 | final boolean isCancelQueue; |
094 |
095 | VersionQueue( int curVersion){ |
096 | this (curVersion, false ); |
097 | } |
098 |
099 | VersionQueue( int curVersion, boolean isCancelQueue) { |
100 | this .version = curVersion; |
101 | this .isCancelQueue = isCancelQueue; |
102 | queue = new ConcurrentLinkedQueue(); |
103 | } |
104 | } |
105 |
106 | /** |
107 | * |
108 | * @param assignVersion is myVersion available |
109 | * @param myVersion wait for this version |
110 | * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn't reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl( boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0 ; |
111 | long lastTime = System.nanoTime(); |
112 | VersionQueue newQueue = waitQueueRef.get(); //A |
113 | if (assignVersion && newQueue.version - myVersion >= 0 ) |
114 | return true ; |
115 | while ( true ) { |
116 | VersionQueue submitQueue = newQueue; //B |
117 | submitQueue.queue.add(Thread.currentThread()); //C |
118 | while ( true ) { |
119 | newQueue = waitQueueRef.get(); //D |
120 | if (newQueue != submitQueue){ //E: it's a new cycle |
121 | if (assignVersion == false ) |
122 | return true ; |
123 | else if (newQueue.version - myVersion >= 0 ) |
124 | return true ; |
125 | else if (newQueue.isCancelQueue) //F: be canceled |
126 | return false ; |
127 | else //just like invoking awaitImpl again |
128 | break ; |
129 | } |
130 | if (timeOutEnable) { |
131 | if (nanosTimeout <= 0 ) |
132 | return false ; |
133 | LockSupport.parkNanos( this , nanosTimeout); |
134 | long now = System.nanoTime(); |
135 | nanosTimeout -= now - lastTime; |
136 | lastTime = now; |
137 | } else |
138 | LockSupport.park( this ); |
139 | if (Thread.interrupted()) |
140 | throw new InterruptedException(); |
141 | } |
142 | } |
143 | } |
144 | } |
先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。
再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。
如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。
这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。
代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。
如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。
本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。
转载地址:http://hfdlo.baihongyu.com/