概述

java并发编程针对锁机制在我看来应该分为两种:jvm语言级别的(synchronized)、基于AQS衍生出来的, 本节总结一下AQS的原理并展示一下衍生出来的工具类的源代码,最后给出一些示例来定制化一些基于AQS的同步工具。

原理

java中的同步组件除了jvm给我们提供的语言级别的同步机制外,剩下的就是通过AQS来实现的了,一般是实现AQS给我们提供的模板方法 来定制化的实现一些锁。AQS的核心包含了:

  • 同步状态管理(volatile类型的state)
  • 同步队列(双向队列)
  • 等待通知 (signal)

这里我们可以看到很像我们在前面线程小节中提到的生产者消费者模型中锁的获取,只不过这里多了一个同步队列, 用于保存获取锁失败的线程,入队的顺序也决定了后续获取锁的顺序!下面我们具体看一下相关的实现。

补图 队列+状态

排他锁

排他锁也叫独占锁,是指一个锁在某一时刻只能被一个线程占有,其他线程只有在持有锁的线程释放之后才可以获取到锁。 我们接下来还是通过代码来演示一下加锁和解锁的流程吧:

加锁

对于排他锁,程序的入口如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire方法就是抢占一个锁的入口,我们看到该方法是final的,这就意味着该方法不可以重写(一个模板方法),具体操作就是下面三段代码:

  • tryAcquire(arg):尝试获取锁
  • addWaiter(Node.EXCLUSIVE):获取失败则将当前线程封装成一个独占的Node并append到队尾直至成功
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg):使线程在等待队列中休息,有机会获取锁(就是上节中提到的LockSupport.unpark操作)的时候再去尝试获取锁,该过程会自旋至获取到锁。

最后我们还看到如果条件为true的话,还会执行一步selfInterrupt,这里解释一下执行该方法的作用。我们可以看到如果第一步获取锁资源成功的话,那也 就不会有后面的操作了,只有在锁获取失败的时候才会让线程进入等待队列进行休息,这里特别强调了一下休息,是因为线程在处于waiting的状态(调用的park操作)的 时候,是没有办法处理中断的,因此如果在waiting的过程中有中断过来,那么线程会抛出一个InterruptException。前面的小节中我们知道了中断的 作用其实是给线程发送一条消息,并不会强制线程中断,而线程抛出异常之后会导致中断标志位复位(中断信号丢失),因此需要在唤醒线程后进行置位操作, 告诉线程,在你睡眠期间有个家伙发了一个中断的信号,至于怎么处理,你自己看着办吧。下面我们从代码的层面上来跟进一下上面的流程吧。

tryAcquire

该方法就是一个模板方法,供子类实现,需要说明的是该方法之所以不是抽象的而是一个空的,是为了方便使用AQS的用户,如果是个抽象的,那么如果用户 想要实现一个共享锁的话,需要重写排他锁才需要重写的方法!

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

addWaiter

当获取锁失败的时候,就会将当前线程封装成一个EXCLUSIVE类型的节点,并添加到队列的尾部,具体过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Node addWaiter(Node mode) {
// 将当前线程构造成一个节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 当前线程获取队尾节点
Node pred = tail;
if (pred != null) {
// 将封装当前线程的节点的前置指针指向位节点,
// 由于tail是一个共享数据,因此多个线程同时入队列的时候,
// 必然会有多个node的前置节点指向尾节点
node.prev = pred;
// 通过cas操作将tail指向当前node
if (compareAndSetTail(pred, node)) {
// 成功的话则将之前的tail节点的后置指针指向当前节点,入队操作就完成了
pred.next = node;
return node;
}
}
// 首次入队失败,则通过自旋的方式进行入队
enq(node);
return node;
}

enq具体代码如下,可以看到该过程是一个自旋的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Node enq(final Node node) {
for (;;) {
// 获取队尾节点
Node t = tail;
if (t == null) { // Must initialize
// 如果队列为空,则尝试初始化队列
if (compareAndSetHead(new Node()))
// 初始化队列成功的话,将tail指向当前节点
// 这里如果有多个线程都进入到初始化队列的话,
// 就只会有一个能够完成队列的初始化操作,
// 因此最终进入这里的线程只会有一个,并不会有多个!
tail = head;
} else {
// 如果队列不是空,并且节点入队失败或者队列为空,但是线程创建队列失败,
// 那么最终肯定会执行到此处,那么自旋的方式设置尾节点就可以了
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

下面通过图示来演示一下获取锁失败后的入队过程: 上图演示的是有多个线程同时竞争锁的情况下,失败的线程入队的情况,由于tail是共享数据, 因此为了保证能够正确的更新tail,这里使用了类似于TSL的原子指令也就是CAS操作,其竞争如下图所示: 最后因为cas操作的原子性,能够成功变成队尾的只有一个节点,其他节点继续在新的tail上竞争,如下图所示: 个人认为上面的过程可以只包含enq(node)的过程。

上面我们看到了抢占不到锁的线程入队的过程,那么入队完了是不是就完事了?并不是!入队完还需要做什么呢? 可以想像一下在医院排队挂号,如果我前面的人没有在办理业务,那么我是不是可以占着这个位子,然后找个小板凳坐下来歇歇呢? 如果我前面的人已经在办理业务了,那又该怎么办才好?

接下来让我们来看一下线程入队之后会做什么操作吧,代码开始于acquireQueued(addWaiter(Node.EXCLUSIVE), arg),内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 注意:方法执行的视角永远不要离开当前的线程,虽然该线程已经被封装到队列的尾部了
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记是否被中断,这也是当前方法的返回值,用于在线程恢复的时候对中断位进行置位操作
boolean interrupted = false;
// 自旋的过程,可能会被park程waiting状态,记住当前线程还处于运行的状态,
// 并不是添加到队列的尾部之后就马上处于waiting状态了
for (;;) {
// 如果当前节点的前置节点是head的话,如果等待head去唤醒老二的话,本来是没有问题的,
// 不过如果唤醒的信号丢了怎么办?如果信号重发了又怎么办?
// 简单点,还是让老二自己去抢吧,如果竞争到了就可以把当前的队头给回收掉,抢不到继续抢!

// 还需要注意的就是如果线程不是首次进入这里的话,比如被interrupt,那么也是可能执行到这里的,
// 不过由于被唤醒后检查前置节点发现,前置节点并不是head,那么会再次睡眠!
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果当前节点的前置节点不是队首,或者是队首但是竞争失败的话就进入到此处
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待的过程中没有成功的获取到资源
if (failed)
cancelAcquire(node);
}
}

如果节点不是老二节点,或者节点是老二节点,但是获取锁失败了,那么就会判断当前节点该不该park掉(非老二park掉,老二的话继续抢)。 因此接下来可以看一下shouldParkAfterFailedAcquire方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前节点前置节点的状态,节点初始化状态是0,并不是SIGNAL
int ws = pred.waitStatus;
// 如果前置节点的状态时SIGNAL,则表示前面节点代表的线程成功获取到锁的时候会通知后记节点
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前置节点的状态大于0,说明前置节点代表的线程被取消了,因此这里将会执行一个短路操作,
// 把被取消的节点全都去掉,下面的循环操作就是这个意思
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 这里将直接再次进入cas操作
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前置节点的状态是0(排他锁的默认值) 或者-3(共享锁才会用到,暂不考虑),说明前置节点不具备唤醒的能力
// 那么将前置节点的状态更改为SIGNAL,这里之所以使用cas的操作,可能是其他线程代表的节点把当前线程的前置节点给
// 短路掉了,因此需要用cas操作

// 想一下,如果pred是head节点会发生什么?如果这一步成功的话,就意味着head节点又要unpark老二节点,
// 如果失败了,那刚好又要进入cas的操作了
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

如果当前线程对应的节点的前置节点设置了SIGNAL,当前线程就具备了park的资格了,接下来就是park掉线程,然后返回当前线程是否在waiting 之后收到了interrupt信号,无论是否收到都直接重置,在线程被再次唤醒并正确获取到锁之后通过selfInterrupt进行置位操作。 至此加锁的整个过程已经完结了,这里还是稍微有点复杂,可以多看几遍,如果不懂得话。

解锁

接下来看一下怕他锁的解锁过程,同加锁的过程是一样的,解锁方法的入口也是一个final类型的方法,这也就是说我们没办法重写这个方法:

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

同获取锁的过程tryAcquire一样,AQS给我们提供了tryRelease方法,我们可以通过实现该方法来释放锁,该方法也是空,这也是模板设计模式的体现:

1
2
3
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

因为是排他锁,这里持有锁的线程只可能有一个,因此我们不用考虑并发唤醒的问题,在head存在,并且head的状态非0的情况下,唤醒node节点的后继节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 此处的node为head节点
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 将head节点的状态清除
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 遍历找到第一个符合条件的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// TODO 这里如果s又被另外的入队的线程短路了,不过还没有回收的话,就会出现唤醒一个无效的线程,不会出现问题么?
if (s != null)
// 定向唤醒线程
LockSupport.unpark(s.thread);
}

上面就是排他锁的加锁和解锁的过程,volatile结合cas操作在并发这一块应该是很常见的操作,这里AQS内置了一个volatile类型的state,如果state为0 就会唤醒后继节点。

共享锁

共享锁是多个线程可以同时获取到同一个锁。共享锁一般可以用于限制程序的并发度,这里比较经典的用例就是信号量了。 还是先看一下AQS共享锁的加锁和解锁是什么样子的吧:

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

废话就不多说了,我们通过重写tryAcquireShared(arg)方法尝试获取指定量的资源,获取失败则入队,知道获取到资源位置,这里如果前面的线程获取到的资源 一定会导致state小于0的话,会不会导致后面的封装了线程的节点阻塞???

在获取资源失败的情况下,将会执行doAcquireShared(arg)方法,该方法详细内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void doAcquireShared(int arg) {
// 构造共享节点并加入到队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

ReentrantLock

CountDownLatch

Semaphore

自定义锁

其他问题

cas 操作带来的问题ABA的问题,如何解决。

小节