并发编程之AQS
概述
java并发编程针对锁机制在我看来应该分为两种:jvm语言级别的(synchronized)、基于AQS衍生出来的, 本节总结一下AQS的原理并展示一下衍生出来的工具类的源代码,最后给出一些示例来定制化一些基于AQS的同步工具。
原理
java中的同步组件除了jvm给我们提供的语言级别的同步机制外,剩下的就是通过AQS来实现的了,一般是实现AQS给我们提供的模板方法 来定制化的实现一些锁。AQS的核心包含了:
- 同步状态管理(volatile类型的state)
- 同步队列(双向队列)
- 等待通知 (signal)
这里我们可以看到很像我们在前面线程小节中提到的生产者消费者模型中锁的获取,只不过这里多了一个同步队列, 用于保存获取锁失败的线程,入队的顺序也决定了后续获取锁的顺序!下面我们具体看一下相关的实现。
补图 队列+状态
排他锁
排他锁也叫独占锁,是指一个锁在某一时刻只能被一个线程占有,其他线程只有在持有锁的线程释放之后才可以获取到锁。 我们接下来还是通过代码来演示一下加锁和解锁的流程吧:
加锁
对于排他锁,程序的入口如下:
1 | public final void acquire(int arg) { |
acquire方法就是抢占一个锁的入口,我们看到该方法是final的,这就意味着该方法不可以重写(一个模板方法),具体操作就是下面三段代码:
- tryAcquire(arg):尝试获取锁
- addWaiter(Node.EXCLUSIVE):获取失败则将当前线程封装成一个独占的Node并append到队尾直至成功
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg):使线程在等待队列中休息,有机会获取锁(就是上节中提到的LockSupport.unpark操作)的时候再去尝试获取锁,该过程会自旋至获取到锁。
最后我们还看到如果条件为true的话,还会执行一步selfInterrupt,这里解释一下执行该方法的作用。我们可以看到如果第一步获取锁资源成功的话,那也 就不会有后面的操作了,只有在锁获取失败的时候才会让线程进入等待队列进行休息,这里特别强调了一下休息,是因为线程在处于waiting的状态(调用的park操作)的 时候,是没有办法处理中断的,因此如果在waiting的过程中有中断过来,那么线程会抛出一个InterruptException。前面的小节中我们知道了中断的 作用其实是给线程发送一条消息,并不会强制线程中断,而线程抛出异常之后会导致中断标志位复位(中断信号丢失),因此需要在唤醒线程后进行置位操作, 告诉线程,在你睡眠期间有个家伙发了一个中断的信号,至于怎么处理,你自己看着办吧。下面我们从代码的层面上来跟进一下上面的流程吧。
tryAcquire
该方法就是一个模板方法,供子类实现,需要说明的是该方法之所以不是抽象的而是一个空的,是为了方便使用AQS的用户,如果是个抽象的,那么如果用户 想要实现一个共享锁的话,需要重写排他锁才需要重写的方法!
1 | protected boolean tryAcquire(int arg) { |
addWaiter
当获取锁失败的时候,就会将当前线程封装成一个EXCLUSIVE类型的节点,并添加到队列的尾部,具体过程如下:
1 | private Node addWaiter(Node mode) { |
enq具体代码如下,可以看到该过程是一个自旋的操作:
1 | private Node enq(final Node node) { |
下面通过图示来演示一下获取锁失败后的入队过程:
上图演示的是有多个线程同时竞争锁的情况下,失败的线程入队的情况,由于tail是共享数据,
因此为了保证能够正确的更新tail,这里使用了类似于TSL的原子指令也就是CAS操作,其竞争如下图所示:
最后因为cas操作的原子性,能够成功变成队尾的只有一个节点,其他节点继续在新的tail上竞争,如下图所示:
个人认为上面的过程可以只包含enq(node)的过程。
上面我们看到了抢占不到锁的线程入队的过程,那么入队完了是不是就完事了?并不是!入队完还需要做什么呢? 可以想像一下在医院排队挂号,如果我前面的人没有在办理业务,那么我是不是可以占着这个位子,然后找个小板凳坐下来歇歇呢? 如果我前面的人已经在办理业务了,那又该怎么办才好?
接下来让我们来看一下线程入队之后会做什么操作吧,代码开始于acquireQueued(addWaiter(Node.EXCLUSIVE), arg),内容如下:
1 | /** |
如果节点不是老二节点,或者节点是老二节点,但是获取锁失败了,那么就会判断当前节点该不该park掉(非老二park掉,老二的话继续抢)。 因此接下来可以看一下shouldParkAfterFailedAcquire方法:
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
如果当前线程对应的节点的前置节点设置了SIGNAL,当前线程就具备了park的资格了,接下来就是park掉线程,然后返回当前线程是否在waiting 之后收到了interrupt信号,无论是否收到都直接重置,在线程被再次唤醒并正确获取到锁之后通过selfInterrupt进行置位操作。 至此加锁的整个过程已经完结了,这里还是稍微有点复杂,可以多看几遍,如果不懂得话。
解锁
接下来看一下怕他锁的解锁过程,同加锁的过程是一样的,解锁方法的入口也是一个final类型的方法,这也就是说我们没办法重写这个方法:
1 | public final boolean release(int arg) { |
同获取锁的过程tryAcquire一样,AQS给我们提供了tryRelease方法,我们可以通过实现该方法来释放锁,该方法也是空,这也是模板设计模式的体现:
1 | protected boolean tryRelease(int arg) { |
因为是排他锁,这里持有锁的线程只可能有一个,因此我们不用考虑并发唤醒的问题,在head存在,并且head的状态非0的情况下,唤醒node节点的后继节点:
1 | /** |
上面就是排他锁的加锁和解锁的过程,volatile结合cas操作在并发这一块应该是很常见的操作,这里AQS内置了一个volatile类型的state,如果state为0 就会唤醒后继节点。
共享锁
共享锁是多个线程可以同时获取到同一个锁。共享锁一般可以用于限制程序的并发度,这里比较经典的用例就是信号量了。 还是先看一下AQS共享锁的加锁和解锁是什么样子的吧:
1 | public final void acquireShared(int arg) { |
废话就不多说了,我们通过重写tryAcquireShared(arg)方法尝试获取指定量的资源,获取失败则入队,知道获取到资源位置,这里如果前面的线程获取到的资源 一定会导致state小于0的话,会不会导致后面的封装了线程的节点阻塞???
在获取资源失败的情况下,将会执行doAcquireShared(arg)方法,该方法详细内容如下:
1 | private void doAcquireShared(int arg) { |
ReentrantLock
CountDownLatch
Semaphore
自定义锁
其他问题
cas 操作带来的问题ABA的问题,如何解决。