1. 入门
CountDownLatch是同步工具类之一,可以指定一个计数值,在并发环境下由线程进行减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒,实现线程间的同步。
Java的concurrent包里面的CountDownLatch其实可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有一个线程去减这个计数器里面的值。
你可以向CountDownLatch
对象设置一个初始的数字作为计数值,任何调用这个对象上的await()
方法都会阻塞,直到这个计数器的计数值被其他的线程减为0为止。
CountDownLatch
的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。假如我们这个想要继续往下执行的任务调用一个CountDownLatch
对象的await()
方法,其他的任务执行完自己的任务后调用同一个CountDownLatch
对象上的countDown()
方法,这个调用await()
方法的任务将一直阻塞等待,直到这个CountDownLatch
对象的计数值减到0为止。
举个例子,有三个工人在为老板干活,这个老板有一个习惯,就是当三个工人把一天的活都干完了的时候,他就来检查所有工人所干的活。记住这个条件:三个工人先全部干完活,老板才检查。所以在这里用Java代码设计两个类,Worker代表工人,Boss代表老板,具体的代码实现如下:
Worker1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Worker implements Runnable{
private CountDownLatch downLatch;
private String name;
public Worker(CountDownLatch downLatch, String name){
this.downLatch = downLatch;
this.name = name;
}
public void run() {
this.doWork();
try{
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
}catch(InterruptedException ie){
}
System.out.println(this.name + "活干完了!");
this.downLatch.countDown();
}
private void doWork(){
System.out.println(this.name + "正在干活!");
}
}
Boss1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class Boss implements Runnable {
private CountDownLatch downLatch;
public Boss(CountDownLatch downLatch){
this.downLatch = downLatch;
}
public void run() {
System.out.println("老板正在等所有的工人干完活......");
try {
this.downLatch.await();
} catch (InterruptedException e) {
}
System.out.println("工人活都干完了,老板开始检查了!");
}
}
app
1 | public class CountDownLatchDemo { |
当你运行CountDownLatchDemo这个对象的时候,你会发现是等所有的工人都干完了活,老板才来检查,下面是我本地机器上运行的一次结果,可以肯定的每次运行的结果可能与下面不一样,但老板检查永远是在后面的。
1 | 王二正在干活! |
原文出自:https://zapldy.iteye.com/blog/746458
2. 源码分析
2.1 构造器
CountDownLatch
和ReentrantLock
一样,内部使用Sync继承AQS。构造函数很简单地传递计数值给Sync,并且设置了state。
1 | Sync(int count) { |
上文已经介绍过AQS的state,这是一个由子类决定含义的“状态”。对于ReentrantLock
来说,state是线程获取锁的次数;对于CountDownLatch
来说,则表示计数值的大小。
2.2 阻塞线程
接着来看await
方法,直接调用了AQS的acquireSharedInterruptibly
。
1 | public void await() throws InterruptedException { |
首先尝试获取共享锁,实现方式和独占锁类似,由CountDownLatch
实现判断逻辑。1
2
3protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
返回1代表获取成功,返回-1代表获取失败。如果获取失败,需要调用doAcquireSharedInterruptibly
:
1 | private void doAcquireSharedInterruptibly(int arg) |
doAcquireSharedInterruptibly
的逻辑和独占功能的acquireQueued
基本相同,阻塞线程的过程是一样的。不同之处:
- 1.创建的
Node
是定义成共享的(Node.SHARED)
; - 2.被唤醒后重新尝试获取锁,不只设置自己为
head
,还需要通知其他等待的线程。(重点看后文释放操作里的setHeadAndPropagate
)
2.4 释放操作
1 | public void countDown() { |
countDown
操作实际就是释放锁的操作,每调用一次,计数值减少1:1
2
3
4
5
6
7public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
同样是首先尝试释放锁,具体实现在CountDownLatch
中:1
2
3
4
5
6
7
8
9
10
11protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
死循环加上cas的方式保证state的减1操作,当计数值等于0,代表所有子线程都执行完毕,被await
阻塞的线程可以唤醒了,下一步调用doReleaseShared
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//1
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//2
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
标记1里,头节点状态如果SIGNAL
,则状态重置为0,并调用unparkSuccessor
唤醒下个节点。
标记2里,被唤醒的节点状态会重置成0,在下一次循环中被设置成PROPAGATE状态,代表状态要向后传播。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在唤醒线程的操作里,分成三步:
- 处理当前节点:非
CANCELLED
状态重置为0; - 寻找下个节点:如果是
CANCELLED
状态,说明节点中途溜了,从队列尾开始寻找排在最前还在等着的节点 - 唤醒:利用
LockSupport.unpark
唤醒下个节点里的线程。
线程是在doAcquireSharedInterruptibly
里被阻塞的,唤醒后调用到setHeadAndPropagate
。1
2
3
4
5
6
7
8
9
10
11private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
setHead
设置头节点后,再判断一堆条件,取出下一个节点,如果也是共享类型,进行doReleaseShared
释放操作。下个节点被唤醒后,重复上面的步骤,达到共享状态向后传播。
要注意,await
操作看着好像是独占操作,但它可以在多个线程中调用。当计数值等于0的时候,调用await
的线程都需要知道,所以使用共享锁。
2.5.限定时间的await
1 | CountDownLatch的await方法还有个限定阻塞时间的版本. |
跟踪代码,最后来看doAcquireSharedNanos方法,和上文介绍的doAcquireShared逻辑基本一样,不同之处是加了time字眼的处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
进入方法时,算出能够执行多久的deadline,然后在循环中判断时间。注意到代码中间有句:1
2
3nanosTimeout > spinForTimeoutThreshold
static final long spinForTimeoutThreshold = 1000L;
spinForTimeoutThreshold
写死了1000ns,这就是所谓的自旋操作。当超时在1000ns内,让线程在循环中自旋,否则阻塞线程。
3.总结
下面对上面说的三个辅助类进行一个总结:
1)CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
另外,CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2)Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限。