Java中的Condition接口实现分析 - Sanarous的博客

Java中的Condition接口实现分析

Condition接口

我们使用 Synchronized 实现“等待/通知”模式时,都是配合每个对象 Object 的监视器方法(wait 和 notify/notifyAll 方法)完成的,但是在显式锁 Lock 中,这种方法就显然不行了。因此显式锁中就引入了 Condition 接口,该接口提供了类似于 Object 的监视器方法,可以配合显式锁 Lock 实现“等待/通知”模式。

该接口位于J.U.C的 locks 包中,这个接口中定义的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface Condition {

/**
* 当前线程进入等待状态直到被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回
* 的情况,包括:
* 其它线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒
* (1)其它线程(调用 interrupt() )中断当前线程
* (2)如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所持有的锁
*/
void await() throws InterruptedException;

/**
* 当前线程进入等待状态直到被通知,从方法名称可以看到该方法对中断不敏感
*/
void awaitUninterruptibly();

/**
* 当前线程进入等待状态后直到被通知、中断或者超时,返回值表示剩余的时间,如果在nanoTimeout纳秒
* 之前被唤醒,那么返回值就是(nanosTimeout-实际耗时)。如果返回值是0或者负数,那么可以认定
* 已经超时了
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;

/**
* 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true
* 否则返回false
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;

/**
* 当前线程进入等待状态直到被通知、中断或者到某个时间,如果没有到指定时间就被通知,方法返回true
* 否则返回false
*/
boolean awaitUntil(Date deadline) throws InterruptedException;

/**
* 唤醒一个等待在Condition上的线程,该线程从等待方法中返回前必须获取与Condition相关的锁
*/
void signal();

/**
* 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获取与COndition相关的锁
*/
void signalAll();
}

我们可以看到接口定义中与 Object 监视器方法基本类似,与监视器方法对比如下:

对比项Object 监视器方法Condition
前置条件获取对象的锁调用Lock.lock() 获取锁,调用lock.newCondition()获取Condition对象
调用方式直接调用直接调用
等待队列个数一个多个
当前线程释放锁并进入等待状态支持支持
当前线程释放锁并进入等待状态,在等待状态中不响应中断不支持支持
当前线程释放锁并进入超时等待状态支持支持
当前线程释放锁并进入等待状态到将来某个时间不支持支持
唤醒等待队列中一个线程支持支持
唤醒等待队列中全部线程支持支持

使用示例

编写一个 Java 应用程序,要求有三个进程:student1,student2,teacher,其中线程student1准备“睡”1分钟后再开始上课,线程 student2 准备“睡” 5 分钟后再开始上课。Teacher 在输出 4 句“上课”后,“唤醒”了休眠的线程 student1;线程 student1 被“唤醒”后,负责再“唤醒”休眠的线程 student2。

我们先使用 Synchronized 和 Object 监视器方法实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package com.fantJ.bigdata;

/**
* Created by Fant.J.
* 2018/7/2 16:36
*/
public class Ten {
static class Student1{
private boolean student1Flag = false;
public synchronized boolean isStudent1Flag() {
System.out.println("学生1开始睡觉1min");
if (!this.student1Flag){
try {
System.out.println("学生1睡着了");
wait(1*1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生1被唤醒");

return student1Flag;
}

public synchronized void setStudent1Flag(boolean student1Flag) {
this.student1Flag = student1Flag;
notify();
}
}
static class Student2{
private boolean student2Flag = false;
public synchronized boolean isStudent2Flag() {
System.out.println("学生2开始睡觉5min");
if (!this.student2Flag){
try {
System.out.println("学生2睡着了");
wait(5*1000*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生2被唤醒");
return student2Flag;
}

public synchronized void setStudent2Flag(boolean student2Flag) {
notify();
this.student2Flag = student2Flag;
}
}
static class Teacher{
private boolean teacherFlag = true;
public synchronized boolean isTeacherFlag() {
if (!this.teacherFlag){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("老师准备吼着要上课");

return teacherFlag;
}

public synchronized void setTeacherFlag(boolean teacherFlag) {
this.teacherFlag = teacherFlag;
notify();
}
}
public static void main(String[] args) {
Student1 student1 = new Student1();
Student2 student2 = new Student2();
Teacher teacher = new Teacher();

Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0;i<4;i++){
System.out.println("上课");
}
teacher.isTeacherFlag();
System.out.println("学生1被吵醒了,1s后反应过来");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
student1.setStudent1Flag(true);
}
});
Thread s1 = new Thread(new Runnable() {
@Override
public void run() {
student1.isStudent1Flag();
System.out.println("准备唤醒学生2,唤醒需要1s");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
student2.setStudent2Flag(true);
}
});
Thread s2 = new Thread(new Runnable() {
@Override
public void run() {
student2.isStudent2Flag();
}
});

s1.start();
s2.start();
t.start();
}
}

当然,用notifyAll可能会用更少的代码,这种实现方式虽然复杂,单性能上会比使用notifyAll()要强很多,因为没有锁争夺导致的资源浪费。但是可以看到,代码很复杂,实例与实例之间也需要保证很好的隔离。

然后再用 Condition 和 ReentrantLock 实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class xxx{
private int signal = 0;
public Lock lock = new ReentrantLock();
Condition teacher = lock.newCondition();
Condition student1 = lock.newCondition();
Condition student2 = lock.newCondition();

public void teacher(){
lock.lock();
while (signal != 0){
try {
teacher.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("老师叫上课");
signal++;
student1.signal();
lock.unlock();
}
public void student1(){
lock.lock();
while (signal != 1){
try {
student1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生1醒了,准备叫醒学生2");
signal++;
student2.signal();
lock.unlock();
}
public void student2(){
lock.lock();
while (signal != 2){
try {
student2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("学生2醒了");
signal=0;
teacher.signal();
lock.unlock();
}

public static void main(String[] args) {
ThreadCommunicate2 ten = new ThreadCommunicate2();
new Thread(() -> ten.teacher()).start();
new Thread(() -> ten.student1()).start();
new Thread(() -> ten.student2()).start();
}
}

Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是lock.newCondition() 调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在lock.lock()lock.unlock之间才可以使用。

可以观察到,我取消了 Synchronized 方法关键字,在每个加锁的方法前后分别加了lock.lock(); lock.unlock();来获取/释放锁,并且在释放锁之前施放想要施放的 Condition 对象。同样的,我们使用 signal 来完成线程间的通信。

深入理解Condition的使用方法

我们可以利用 Condition 来实现一个有界队列,什么叫有界队列呢?有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class MyQueue<E> {

private Object[] objects;
private Lock lock = new ReentrantLock();
private Condition addCDT = lock.newCondition();
private Condition rmCDT = lock.newCondition();

private int addIndex;
private int rmIndex;
private int queueSize;

MyQueue(int size){
objects = new Object[size];
}

//添加元素
public void add(E e){
lock.lock();
while (queueSize == objects.length){
try {
addCDT.await(); //进入等待状态
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
objects[addIndex] = e;
System.out.println("添加了数据"+"Objects["+addIndex+"] = "+e);
if (++addIndex == objects.length){
addIndex = 0;
}
queueSize++;
rmCDT.signal();
lock.unlock();

}

//删除元素
public Object remove(){
lock.lock();
while (queueSize == 0){
try {
System.out.println("队列为空");
rmCDT.await(); //进入等待状态
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object temp = objects[rmIndex];
objects[rmIndex] = null;
System.out.println("移除了数据"+"Objects["+rmIndex+"] = null");
if (++rmIndex == objects.length){
rmIndex = 0;
}
queueSize--;
addCDT.signal();
lock.unlock();
return temp;
}
public void foreach(E e){
if (e instanceof String){
Arrays.stream(objects).map(obj->{
if (obj == null){
obj = " ";
}
return obj;
}).map(Object::toString).forEach(System.out::println);
}
if (e instanceof Integer){
Arrays.stream(objects).map(obj -> {
if (obj == null ){
obj = 0;
}
return obj;
}).map(object -> Integer.valueOf(object.toString())).forEach(System.out::println);
}
}
}

add 方法就是往队列中添加数据。 remove 是从队列中按 FIFO 移除数据。 foreach 方法是一个观察队列内容的工具方法,很容易看出,它是用来遍历的。

测试方法:

1
2
3
4
5
6
7
8
9
10
    public static void main(String[] args) {
MyQueue<Integer> myQueue = new MyQueue<>(5);
myQueue.add(5);
myQueue.add(4);
myQueue.add(3);
// myQueue.add(2);
// myQueue.add(1);
myQueue.remove();
myQueue.foreach(5);
}
1
2
3
4
5
6
7
8
9
添加了数据Objects[0] = 5
添加了数据Objects[1] = 4
添加了数据Objects[2] = 3
移除了数据Objects[0] = null
0
4
3
0
0

Condition源码分析

由上面两个例子,我们可以基本理解了Conditon的基本方法和作用,以及其简单的应用场景,那么我们肯定好奇这其中的方法是怎么去实现的,下面我们就来一探究竟。

我们从 Lock 的实现类 ReentrantLock 出发,看看里面是如何实现 Condition 的。

在 IDEA 中使用 double shift 打开 ReentrantLock 源码,可以看到 ReentrantLock 内部有一个静态内部类 Sync ,并且 Sync 是继承自 AQS(AbstractQueuedSynchronizer) 的,这与我们之前分析 AQS 原理是一致的,Sync 中有一个 newCondition 方法:

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

可以看到其中 new 了一个 ConditionObject,然后我们使用 Ctrl + ConditionObject 打开其源码,可以看到这个类是属于 AQS 的内部类,在之前讲解 AQS 原理时没有分析其中 ConditionObject 内部实现类也是为了留到此处进行讲解。为什么 Condition 实现类需要放到 AQS 内部,其实想一下就知道,由于 Condition 的操作都需要获取相关联的锁,所以作为同步器的内部实现类也是非常合理的设计。

我们先看 ConditionObject 的实例域:

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

可以看到实例变量中是有两个 Node构成的,标注为 transient 是表示这两个变量不会被序列化。那么 Node 是什么呢?Node 的类型就是 AbstractQueuedSynchronizer.Node,也就是我们在 AQS 原理中讲解的 FIFO 队列,这也说明每个 Condition 对象中都包含着一个这样的等待队列,这个队列就是 Condition 对象实现等待/通知功能的关键。

等待队列

我们先回顾一下这个 FIFO 队列,Node 对应的主要字段有:

  1. waitStatus:等待状态,所有的状态见下面的表格。
  2. prev:前驱节点
  3. next:后继节点
  4. thread:当前节点代表的线程
  5. nextWaiter:Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter保存后继节点。
状态含义
CANCELLED1当前节点因为超时或中断被取消同步状态获取,该节点进入该状态后不会再变化
SIGNAL-1标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后续节点继续运行,每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL
CONDITION-2标识当前节点是作为等待队列节点使用的
PROPAGATE-3表示下一次共享式同步状态获取将会无条件地被传播下去
00初始状态

一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下:

如上图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。此处更新节点的过程没有使用 CAS 进行保证,这是因为调用 await() 方法的线程必定是获取了锁的线程,也就是通过锁来保证线程安全的。

在 Object 监视器模型上,一个对象拥有一个同步队列和等待队列,但是 Lock 拥有一个同步队列和多个等待队列,其对应关系如下:

如图所示,Condition 的实现是同步器的内部类,因此每个 Condition 实例都能够访问同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用。

等待

了解了等待队列原理后,我们可以看一下 condition.await() 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

//构造一个新的等待队列Node加入到队尾
Node node = addConditionWaiter();

//释放当前线程的独占锁,不管重入几次,都把state释放为0
int savedState = fullyRelease(node);
int interruptMode = 0;

//如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//和中断相关,主要是区分两种中断:是在被signal前中断还是在被signal后中断,如果是被signal前就被中断则抛出 InterruptedException,否则执行 Thread.currentThread().interrupt();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直接退出自旋
break;
}
//退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

该方法就是将当前线程构造成节点并加入到等待队列中,然后释放同步状态,并唤醒同步队列中后继节点,然后当前状态会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其它线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 IntreeuptException。

而如果从队列的角度来看,这个过程如下图所示:

如图所示,同步队列的首节点不会直接加入等待队列,而是通过 addConditionWaiter() 方法把当前线程构造成一个新的节点并将其加入到等待队列中。

通知

调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
Node first = firstWaiter;
if (first != null)
doSignal(first); //通知等待队列队首的节点。
}

调用该方法的前置条件就是当前线程必须获取了锁,可以看到 signal() 方法进行了 isHeldExclusively() 检查,也就是当前线程必须是获取了锁的线程,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
//如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点
int ws = p.waitStatus;
//如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会重新尝试将节点的先驱状态设为SIGNAL并再次park线程;如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

从队列的角度来看,这个过程如下图所示:

而 Condition 中的 signalAll() 方法,相当于对等待队列中每个节点均执行一次 signal() 方法,效果就是将等待队列中所有节点全部移到同步队列中,并唤醒每个节点的线程。

Condition实现等待/通知的本质

总的来说,Condition 的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用 Condition.await() 时,它会执行以下步骤:

  1. 构造一个新的等待队列节点加入到等待队列队尾
  2. 释放锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用 signal())或被中断
  4. 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用 Condition.signal() 时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点 CANCELLED,就尝试唤醒下一个节点;如果再 CANCELLED 则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时 await() 操作的步骤 3 的解锁条件就已经开启了。然后分两种情况讨论:

  1. 如果先驱节点的状态为 CANCELLED ( > 0 ) 或设置先驱节点的状态为 SIGNAL 失败,那么就立即唤醒当前节点对应的线程,此时 await() 方法就会完成步骤 3,进入步骤 4。
  2. 如果成功把先驱节点的状态设置为了 SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候 await() 的步骤 3 才执行完成,而且有很大概率快速完成步骤 4。

总结

如果知道 Object 的等待通知机制,Condition 的使用是比较容易掌握的,因为和 Object 等待通知的使用基本一致。

对 Condition 的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。

之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由 next 指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用 CAS 把 tail 设置为新节点,然后才修改原 tail 的 next 指针到新节点的。因此用 next 向后遍历是不安全的,但是如果在设置新节点为 tail 前,为新节点设置 prev,则可以保证从 tail 往前遍历是安全的。因此要安全的获取一个节点 Node 的下一个节点,先要看 next 是不是 null,如果是 null,还要从 tail 往前遍历看看能不能遍历到 Node。

而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要 prev 指针。

参考文章

  1. 方腾飞 等著 《Java 并发编程的艺术》
  2. Condition (Java 2 Platform SE 6)
  3. Java显式锁学习总结之六:Condition源码分析
  4. Java并发编程 – Condition
如果这篇文章对您很有帮助,不妨
-------------    本文结束  感谢您的阅读    -------------
0%