首页 技术 正文
技术 2022年11月11日
0 收藏 577 点赞 3,456 浏览 7917 个字

一行一行源码分析清楚 AbstractQueuedSynchronizer (三)

转自:https://javadoop.com/post/AbstractQueuedSynchronizer-3

这篇文章是 AQS 系列的最后一篇,第一篇,我们通过 ReentrantLock 公平锁分析了 AQS 的核心,第二篇的重点是把 Condition 说明白,同时也说清楚了对于线程中断的使用。

这篇,我们的关注点是 AQS 最后的部分,共享模式的使用。有前两篇文章的铺垫,剩下的源码分析将会简单很多。

本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier、Semaphore 的源码一起过一下。

老规矩:不放过任何一行代码,没有任何糊弄,没有任何瞎说。

  • AQS的共享模式

在讲CountDownLatch之前,先让我们了解一下什么是AQS中的共享模式和共享锁。

深入浅出AQS之共享锁模式

原文地址:http://www.jianshu.com/p/1161…

搞清楚AQS独占锁的实现原理之后,再看共享锁的实现原理就会轻松很多。两种锁模式之间很多通用的地方本文只会简单说明一下,就不在赘述了,具体细节可以参考我的上篇文章。

一、执行过程概述

获取锁的过程:

  1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
  2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
  3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

释放锁过程:

  1. 当线程调用releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

二、源码深入分析

基于上面所说的共享锁执行流程,我们接下来看下源码实现逻辑:
首先来看下获取锁的方法acquireShared(),如下

   public final void acquireShared(int arg) {
//尝试获取共享锁,返回值小于0表示获取失败
if (tryAcquireShared(arg) < 0)
//执行获取锁失败以后的方法
doAcquireShared(arg);
}

这里tryAcquireShared()方法是留给用户去实现具体的获取锁逻辑的。关于该方法的实现有两点需要特别说明:

一、该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取。

二、该方法返回值是个重点。其一、由上面的源码片段可以看出返回值小于0表示获取锁失败,需要进入等待队列。其二、如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。最后、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

有了上面的约定,我们再来看下doAcquireShared方法的实现:

    //参数不多说,就是传给acquireShared()的参数
private void doAcquireShared(int arg) {
//添加等待节点的方法跟独占锁一样,唯一区别就是节点类型变为了共享型,不再赘述
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//表示前面的节点已经获取到锁,自己会尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
//注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
if (r >= 0) {
//这里是重点,获取到锁以后的唤醒操作,后面详细说
setHeadAndPropagate(node, r);
p.next = null;
//如果是因为中断醒来则设置中断标记位
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//挂起逻辑跟独占锁一样,不再赘述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//获取失败的取消逻辑跟独占锁一样,不再赘述
if (failed)
cancelAcquire(node);
}
}

独占锁模式获取成功以后设置头结点然后返回中断状态,结束流程。而共享锁模式获取成功以后,调用了setHeadAndPropagate方法,从方法名就可以看出除了设置新的头结点以外还有一个传递动作,一起看下代码:

    //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; //记录当前头节点
//设置新的头节点,即把当前获取到锁的节点设置为头节点
//注:这里是获取到锁之后的操作,不需要并发控制
setHead(node);
//这里意思有两种情况是需要执行唤醒操作
//1.propagate > 0 表示调用方指明了后继节点需要被唤醒
//2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
//这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
if (s == null || s.isShared())
//后面详细说
doReleaseShared();
}
} private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

最终的唤醒操作也很复杂,专门拿出来分析一下:
注:这个唤醒操作在releaseShare()方法里也会调用。

private void doReleaseShared() {
for (;;) {
//唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
//其实就是唤醒上面新获取到共享锁的节点的后继节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
//这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//如果头结点没有发生变化,表示设置完成,退出循环
//如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
if (h == head)
break;
}
}

接下来看下释放共享锁的过程:

public final boolean releaseShared(int arg) {
//尝试释放共享锁
if (tryReleaseShared(arg)) {
//唤醒过程,详情见上面分析
doReleaseShared();
return true;
}
return false;
}

注:上面的setHeadAndPropagate()方法表示等待队列中的线程成功获取到共享锁,这时候它需要唤醒它后面的共享节点(如果有),但是当通过releaseShared()方法去释放一个共享锁的时候,接下来等待独占锁跟共享锁的线程都可以被唤醒进行尝试获取。

三、总结

跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。

CountDownLatch

CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类。latch 的中文意思是门栓、栅栏,具体怎么解释我就不废话了,大家随意,看两个例子就知道在哪里用、怎么用了。

使用例子

我们看下 Doug Lea 在 java doc 中给出的例子,这个例子非常实用,我们经常会写这个代码。

假设我们有 N ( N > 0 ) 个任务,那么我们会用 N 来初始化一个 CountDownLatch,然后将这个 latch 的引用传递到各个线程中,在每个线程完成了任务后,调用 latch.countDown() 代表完成了一个任务。

调用 latch.await() 的方法的线程会阻塞,直到所有的任务完成。

class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = Executors.newFixedThreadPool(8); // 创建 N 个任务,提交给线程池来执行
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i)); // 等待所有的任务完成,这个方法才会返回
doneSignal.await(); // wait for all to finish
}
}class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
} public void run() {
try {
doWork(i);
// 这个线程的任务完成了,调用 countDown 方法
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
} void doWork() { ...}
}

所以说 CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。这里例子中,只有 main 线程调用了 await 方法

我们再来看另一个例子,这个例子很典型,用了两个 CountDownLatch:

class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start(); // 这边插入一些代码,确保上面的每个线程先启动起来,才执行下面的代码。
doSomethingElse(); // don't let run yet
// 因为这里 N == 1,所以,只要调用一次,那么所有的 await 方法都可以通过
startSignal.countDown(); // let all threads proceed
doSomethingElse();
// 等待所有任务结束
doneSignal.await(); // wait for all to finish
}
}class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
} public void run() {
try {
// 为了让所有线程同时开始任务,我们让所有线程先阻塞在这里
// 等大家都准备好了,再打开这个门栓
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {
} // return;
} void doWork() { ...}
}

这个例子中,doneSignal 同第一个例子的使用,我们说说这里的 startSignal。N 个新开启的线程都调用了startSignal.await() 进行阻塞等待,它们阻塞在栅栏上,只有当条件满足的时候(startSignal.countDown()),它们才能同时通过这个栅栏。

Java并发指南9:AQS共享模式与并发工具类的实现Java并发指南9:AQS共享模式与并发工具类的实现转存失败伪共享(False Sharing)
[ Java8中用sun.misc.Contended避免伪共享(false sharing)](
http://blog.csdn.net/aigoogle/article/details/41518369)

我们再次回到arenaExchange()。取得arena中的node节点后,如果定位的节点q 不为空,且CAS操作成功,则交换数据,返回交换的数据,唤醒等待的线程。

如果q等于null且下标在bound & MMASK范围之内,则尝试占领该位置,如果成功,则采用自旋 + 阻塞的方式进行等待交换数据。

如果下标不在bound & MMASK范围之内获取由于q不为null但是竞争失败的时候:消除p。加入bound 不等于当前节点的bond(b != p.bound),则更新p.bound = b,collides = 0 ,i = m或者m – 1。如果冲突的次数不到m 获取m 已经为最大值或者修改当前bound的值失败,则通过增加一次collides以及循环递减下标i的值;否则更新当前bound的值成功:我们令i为m+1即为此时最大的下标。最后更新当前index的值。

Exchanger使用、原理都比较好理解,但是这个源码看起来真心有点儿复杂,是真心难看懂,但是这种交换的思路Doug Lea在后续博文中还会提到,例如SynchronousQueue、LinkedTransferQueue。

最后用一个在网上看到的段子结束此篇博客(http://brokendreams.iteye.com/blog/2253956),博主对其做了一点点修改,以便更加符合在1.8环境下的Exchanger:

其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

  1. 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
  2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
  3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
  4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
  5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。
    6.如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

总结

CountdownLatch基于AQS的共享锁实现,线程运行完内部调用cas操作修改state – 1,当state为0时阻塞的线程被一起唤醒。

collidebarrier基于lock和condition实现,线程运行到屏障时cas操作修改state – 1,并阻塞在condition上,state= 0 时conditon.signal阻塞的线程。

Semaphore基于AQS共享实现。同上。

exchanger内部保持一个栈(后来改为slot或slot数组),一个线程进入时cas把数据放到栈中,并阻塞,另一个线程进来发现有东西,cas获取并清空,同时把数据放到伙伴线程的一个结构中,唤醒伙伴线程,运行完毕。

写到这里,终于把 AbstractQueuedSynchronizer 基本上说完了,对于 Java 并发,Doug Lea 真的是神一样的存在。日后我们还会接触到很多 Doug Lea 的代码,希望我们大家都可以朝着大神的方向不断打磨自己的技术,少一些高大上的架构,多一些实实在在的优秀代码吧。

更多内容请关注微信公众号【Java技术江湖】

一位阿里 Java 工程师的技术小站。作者黄小斜,专注 Java 相关技术:SSM、SpringBoot、MySQL、分布式、中间件、集群、Linux、网络、多线程,偶尔讲点Docker、ELK,同时也分享技术干货和学习经验,致力于Java全栈开发!(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)

Java并发指南9:AQS共享模式与并发工具类的实现Java并发指南9:AQS共享模式与并发工具类的实现

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,965
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,486
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,331
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,114
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,747
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,781