当前位置: 首页 > news >正文

公司品牌官网建站多元国际二维码图片

公司品牌官网建站,多元国际二维码图片,单页静态网站怎么做,软件培训机构排名前十如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成… 如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成立条件满足,唤醒等待的节点小总结 小结 引言 AQS 抽象队列同步器(AbstractQueuedSynchronizer) 作为Java并发库的基石像ReentrantLock,ThreadPoolExecutor,Semaphore等类都使用到AQS完成线程间同步本文主要来和大家分享一下我自己对AQS实现和设计理念的一些新的思考。 AQS(抽象队列同步器)核心由两部分组成: 条件变量等待队列 条件变量很简单所有场景下的条件变量都可以泛化为一个整数值因此AQS使用一个整数来表示条件变量: private volatile int state;考虑到等待线程数量未知并且经常发生出入队动作因此AQS队列采用链表结构实现那么下面我们首先来看看AQS针对的哪些场景下的问题。 AQS需要解决哪些场景下的问题 AQS需要解决哪些场景下的问题 , 泛化而言有五个方面: 互斥模式 互斥模式下 state 1 表示资源被占有 state 0 表示资源空闲 , state 1 表示可重入情况 共享模式: state 0 表示资源有剩余 , state 0 表示资源不足 混合互斥和共享的模式: 当需要对资源进行修改时需要转到互斥模式仅仅是对资源进行读取操作可以处在共享模式下如: 读写锁场景 栅栏模式: state为剩余未到达栅栏处的线程数先到达栅栏处的线程需要等待直到剩余线程都达到栅栏 , 当state0时说明所有线程都到达栅栏处此时打开栅栏即唤醒所有线程继续执行 条件变量: AQS支持多条件变量,条件变量需要锁的保护,所以当AQS使用条件变量时要求处于互斥模式下此时互斥模式充当互斥锁对条件变量进行保护防止虚假唤醒问题发生 如果大家仔细观察会发现以上四种模式本质都是条件变量的应用不同之处在于对何时满足条件这个定义不同因此AQS也支持第五种模式条件变量当以上四种常见均不能满足你的需求时由你自己设计满足条件的定义是什么。 关于条件变量的实现这里我想特别展开说明一点: 为什么条件变量需要锁的保护 大家可以反向思考一下如果没有锁的保护条件变量的使用会出现什么问题 ? 如果不使用锁来保护你正在等待的数据就会出现虚假唤醒的问题这个问题出现的本质是因为第1步和第4步之间存在一个时间窗口在这个时间窗口内如果线程2执行notify操作那么将使得线程1错过唤醒机会从而出现Lost Wakeup的问题。 解决Lost Wakeup问题的方法就是使用锁来保护等待的数据也就是条件变量如下图所示: 当然如果这边继续深入下去的话大家还可以思考一点: 条件变量和锁有什么联系 ? 两者表现行为都是在条件不满足时阻塞条件满足时被唤醒所以泛化来看锁本质也是条件变量的一种应用那么锁的使用上也存在虚假唤醒问题吗? 首先如果按照问题所述,显然是存在虚假唤醒问题的如上图所述但是这里的错误在于问题混淆了锁的概念严格来说只存在两种锁: 自旋锁睡眠锁 对于睡眠锁而言其本质就是条件变量的一种应用所以存在虚假唤醒问题需要解决。而自旋锁这是通过不断CAS重试来实现锁对于自旋锁而言是不存在虚假唤醒问题的: 所以为了解决睡眠锁可能存在的虚假唤醒问题我们需要使用自旋锁来保证睡眠锁的条件变量example的安全性添加了自旋锁保护的睡眠锁实现如下: 相信经过了上面的讲解大家已经理解了为什么条件变量需要锁的保护了我们常说的锁其实属于睡眠锁睡眠锁本质也是对条件变量的一种实现那么当某个线程获取锁失败后需要进入锁队列中挂起等待如下图所示: 如果我们使用锁来保护条件变量此时成功获取了锁但是发现条件不满足的线程又会进入条件变量关联的队列中阻塞等待如下图所示: 条件变量需要关联一个条件队列睡眠锁本质也是条件变量的应用因此上面说的锁队列本质也是条件队列而锁保护的条件变量关联的队列我们也称之为条件队列因此为了区分本文后面都将称前者为锁队列。 还有一点很重要就是条件变量只能在互斥锁保护下使用共享锁模式下不能使用条件变量否则还是会存在虚假唤醒问题: 上面已经铺垫了很多前置知识对于AQS来说其支持互斥锁共享锁和多条件变量三大特性因此其内部会存在两种类型队列一种就是锁队列还有一种就是条件变量队列如果存在多个条件变量那么就会存在多个条件队列具体如下图所示: 下面我们从源码角度来分析一下三种模式的不同实现。 本文代码均基于JDK 11进行讲解。 互斥模式 当AQS运行在互斥模式下时Node节点结构如下所示: static final class Node {static final int CANCELLED 1;static final int SIGNAL -1;volatile Node prev;volatile Node next;volatile int waitStatus;// 省去互斥模式下使用不到的相关属性...}其中waitStatus属性记录当前节点处于何种状态下互斥模式下节点只可能存在三种状态: SIGNAL : 后继节点successor已经被或将会被阻塞通过 park因此当前节点必须在释放或取消时唤醒它的后继节点。CANCELLED: 该节点因超时或中断被取消。节点永远不会离开此状态。特别地带有取消节点的线程不会再次阻塞。0 : 节点状态值为0存在两个可能性 锁队列尾部节点的状态值为0因为其没有后继节点需要唤醒 释放锁时唤醒锁队列中第一个有效节点此时会先将头结点的状态值设置为0然后唤醒锁队列中第一个有效节点被唤醒的节点尝试去获取锁如果获取失败会将头结点状态重新改回SIGNAL然后再次尝试获取锁如果获取锁成功则将自己设置为头结点此时头结点状态依旧为SIGNAL。 互斥模式下释放锁时会通过头结点的状态值判断当前锁队列是否还存在阻塞的线程如果头结点状态值为0表明当前头结点没有后继节点需要唤醒了如果头结点值为1表明头结点存在后继节点需要唤醒因此我们需要对头结点的状态值变更尤其关注。 互斥模式的实现这里以ReentrantLock为例进行说明。 获取锁 public final void acquire(int arg) {// tryAcquire: 不同模式下对获取锁成功的定义是不同的,对于互斥锁实现而言,state作为一个二元值,0表示锁空闲,1表示锁占用if (!tryAcquire(arg) // 如果获取锁失败,则进入锁队列进行排队等候,返回值代表等待过程中是否被打断过acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}不同模式下对获取锁成功的定义是不同的,因此AQS把tryAcquire方法的实现交由子类决定,对于ReentrantLock实现而言,state作为一个二元值,0表示锁空闲 1表示锁占用 大于1表示发生可重入: 下面以ReentrantLock公平锁为例进行讲解公平体现在线程获取锁时是否先看一眼锁队列是否已经有人在排队中如果是则自己排到锁队列末尾等候。 protected final boolean tryAcquire(int acquires) {final Thread current Thread.currentThread();int c getState();if (c 0) {// 公平体现在每次抢锁前先看一眼是否有人在等着了if (!hasQueuedPredecessors() // 如果锁队列为空,那么尝试去获取锁compareAndSetState(0, acquires)) {// 获取锁成功,设置当前锁的占有者为当前线程自己setExclusiveOwnerThread(current);return true;}}// 判断是否是锁重入else if (current getExclusiveOwnerThread()) {// 累加锁重入计数int nextc c acquires;if (nextc 0)throw new Error(Maximum lock count exceeded);setState(nextc);return true;}return false;}由于ReentrantLock支持锁重入所以实际的state并不只有0和1两种状态。 抢锁失败入队 如果获取锁失败当前线程进入锁队列排队等候首先第一步新为当前线程新创建一个Node并添加到锁队列末尾 private Node addWaiter(Node mode) {Node node new Node(mode);// 为当前线程创建一个Node此处Node为独占模式 for (;;) {// 锁队列是全局共享资源,因此这里采用CAS重试确保尾插操作的原子性 Node oldTail tail;if (oldTail ! null) {node.setPrevRelaxed(oldTail);if (compareAndSetTail(oldTail, node)) {oldTail.next node;return node;}} else {// 初始化锁队列initializeSyncQueue();}}}第二步是把当前线程给挂起:acquireQueued函数不仅包含把当前线程挂起的逻辑还包含被唤醒后尝试抢锁的逻辑如果失败继续挂起 final boolean acquireQueued(final Node node, int arg) {boolean interrupted false;try {for (;;) {// 获取当前线程前置节点final Node p node.predecessor();// 前置节点是头结点才有资格去抢锁if (p head tryAcquire(arg)) {// 抢锁成功,设置当前节点为头结点// 这里的头结点可以看做是Dummy NodesetHead(node);p.next null; // help GC// 阻塞在锁队列等待锁的过程中是否被打断过return interrupted;}// 根据前驱节点状态,判断是否需要阻塞等待还是再尝试获取一次锁if (shouldParkAfterFailedAcquire(p, node))// 挂起当前线程interrupted | parkAndCheckInterrupt();}} catch (Throwable t) {// 挂起线程和唤醒后抢锁的逻辑过程中如果发生异常,则将当前Node状态设置为CANCELcancelAcquire(node);// 如果以上过程中线程被打断过,那么这里补充一次自我打断if (interrupted)selfInterrupt();throw t;}}shouldParkAfterFailedAcquire函数根据节点的不同状态,判断当前线程自身是应该立即挂起还是再尝试获取一次锁: 前置节点如果处于SIGNAL状态,则表明锁依旧被前驱节点所在线程持有当前线程应该立即阻塞前置节点处于取消状态此时做一波清理工作将所有处于取消态的节点都移出队列然后当前线程再尝试偶去一次锁前置节点状态值为0那么前置节点可能为旧的尾节点或者头结点如果是旧的尾节点则将其状态更正为SIGNAL因为当前节点变成新的尾节点了。如果前置节点是头结点说明当前线程被唤醒后抢锁失败则再次将头结点状态更正为SIGNAL。最后再尝试获取一次锁。 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws pred.waitStatus;// 当前节点的前置节点依然处于持有锁的状态,当前线程应该立即阻塞if (ws Node.SIGNAL)return true; // 当前线程应该阻塞// 前置节点处于取消状态,此处从前置节点开始往前扫描,将所有处于取消状态的节点都移出锁队列 if (ws 0) {do {node.prev pred pred.prev;} while (pred.waitStatus 0);pred.next node;} else {// 此处waitStatues的值可能为0或者PROPAGATE(共享模式下节点才会存在该状态,此处不关心)pred.compareAndSetWaitStatus(ws, Node.SIGNAL);}return false;}AQS队列采用双向链表实现其中prev指针就用在对Cancel节点的回收中其他场景不会用到。 parkAndCheckInterrupt方法将当前线程挂起,被唤醒后返回当前线程是否被打断过: private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}interrupted方法会清空打断标记因此如果必要可以在被唤醒并成功抢到锁后补充一次自我打断的逻辑。 释放锁 release方法用于释放锁: public final boolean release(int arg) {// 由子类实现决定本次锁释放是否成功if (tryRelease(arg)) {Node h head;// 锁队列刚初始化的时候,头结点的waitStatus0// 但是当第一个Node入队后,头结点的waitStatus应该等于SIGNAL,表明当前头结点有后继节点需要唤醒if (h ! null h.waitStatus ! 0)// 说明锁队列不为空,此时唤醒后继结点unparkSuccessor(h);return true;}return false;}tryRelease方法尝试去释放锁不同模式下对锁被成功释放的定义不同因此这个方法需要子类继承实现此处给出ReentrantLock的实现: protected final boolean tryRelease(int releases) {// 由于存在可重入清空,因此只有state递减为0时,才算锁被释放成功int c getState() - releases;// 非法情况,只有持有锁的线程才能释放当前锁if (Thread.currentThread() ! getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free false;// state递减为0的时候,才算锁被释放成功if (c 0) {free true;setExclusiveOwnerThread(null);}setState(c);return free;}unparkSuccessor方法唤醒某个节点的后继结点这里是唤醒头结点的后继结点: private void unparkSuccessor(Node node) {int ws node.waitStatus;// 将头结点状态值设置为0if (ws 0)node.compareAndSetWaitStatus(ws, 0);// 获取后继节点Node s node.next;// 如果当前节点没有后继节点或者后继节点处于取消状态,则向后寻找到第一个没有取消的节点// 然后唤醒该节点if (s null || s.waitStatus 0) {s null;for (Node p tail; p ! node p ! null; p p.prev)if (p.waitStatus 0)s p;}// 唤醒节点if (s ! null)LockSupport.unpark(s.thread);}如果被唤醒的线程抢锁成功则会在acquireQueued方法的for循环中将自己设置为新的头结点此时新的头结点的状态值可能为SIGNAL或者0 如果是SIGNAL说明当前节点还存在后继节点需要唤醒否则说明当前节点是尾结点。 如果被唤醒的线程抢锁失败则会在shouldParkAfterFailedAcquire方法中再次将头结点的状态设置为SIGNAL因为当前头结点抢锁失败了。 小总结 AQS互斥模式的实现到此就讲解完毕了整体而言还是很简单的这里简单总结一下互斥模式下抢锁和释放锁的步骤 首先是抢锁步骤: 尝试获取锁获取失败入队阻塞获取锁成功则返回返回前检查获取锁的过程中是否存在被吞没的中断如果存在则补充一次自我中断 其次是释放锁步骤: 判断锁队列是否为空,为空则条件唤醒阶段如果不为空则唤醒第一个有效节点 上面总结了一下AQS互斥模式的运行流程下面我们来看看AQS互斥模式的实现过程中有哪些值得我们学习的并发设计技巧 AQS中有两个共享资源需要被保护一个是state条件值另一个是锁队列这里重点在队列的头尾节点可能会出现并发问题。state变量可能存在的并发安全问题在多线程同时尝试抢锁设置state的值为1该场景下只需要单次CAS设置即可如果失败说明其他线程已经抢到锁了那么当前线程执行入队阻塞流程。对于队列头尾节点的访问而言并发问题可能出在多线程共同尝试阻塞入队此时需要CAS配合重试确保入队成功还有就是节点状态值的变更该场景下只需要单次CAS即可如果失败说明有其他线程更改了当前节点状态那么当前线程的更新可以废弃。 共享模式 当AQS运行在共享模式下时此时Node节点的结构如下所示: static final class Node {static final Node SHARED new Node();static final Node EXCLUSIVE null;static final int CANCELLED 1;static final int SIGNAL -1;static final int PROPAGATE -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared SHARED;}// 省去共享模式下使用不到的相关属性...}相比于互斥模式这里节点增加了PROPAGATE状态并且还增加了shared属性来表明当前阻塞节点是处在共享模式下还是互斥模式下。 其实共享模式下用不到shared属性该属性是应用在混合模式下但是这里为了下面讲解源码方便理解就提前拿出来说了。 PROPAGATE状态主要是为了解决并发情况下的唤醒丢失问题而提出的下面会专门有一小节来讲解该状态的作用这里暂不做解释。 共享模式的实现这里以Semaphore为例进行说明。 获取共享资源 Semaphore获取资源的方法是acquireSharedInterruptibly public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果当前线程被打断了,则抛出打断异常 if (Thread.interrupted())throw new InterruptedException();// 尝试去获取共享资源 -- 如何才算获取共享资源成功,由子类实现决定if (tryAcquireShared(arg) 0)// 资源数量不足,则进入阻塞阶段doAcquireSharedInterruptibly(arg);}由子类实现决定剩余资源够不够分配给当前线程: protected int tryAcquireShared(int acquires) {for (;;) {// Semaphore获取资源时也有公平和非公平两种模式,这里展示的是公平模式的实现// 所谓公平就是抢夺资源前先看看有没有人在排队if (hasQueuedPredecessors())return -1;// remaining小于0则表示正在等待资源的线程数量// 如果remaining0 说明资源还够,那么当前线程就可以获取到资源,无需阻塞等待了 // note: remaining是减去当前线程所需资源后的剩余资源数量 int available getState();int remaining available - acquires;// 如果剩余资源不足,则直接返回对应负值,表示资源不足需要等待if (remaining 0 ||// 扣减完后,剩余资源数量大于等于0,则使用cas配合重试确保原子性扣减资源数量成功compareAndSetState(available, remaining))return remaining;}}如果剩余资源不足当前线程需要进入资源队列进行等待这段逻辑存在于doAcquireSharedInterruptibly方法中当然该方法还包含线程被唤醒后再次尝试抢锁的逻辑: private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 为当前线程创建Node节点尾插到资源队列尾部,节点模式是共享模式final Node node addWaiter(Node.SHARED);try {for (;;) {// 判断自己是否是头结点的后继节点final Node p node.predecessor();if (p head) {// 如果是头结点的后继节点,则尝试再次去获取资源int r tryAcquireShared(arg);// 与互斥模式不同的一点在于,如果当前线程获取资源成功,那么它会唤醒其后继节点继续尝试获取资源// 后继节点获取成功后,再唤醒它的后继节点,直到资源不足,则停止唤醒if (r 0) {// 如果剩余资源数量为0,也会进入到该方法,但是不会执行链式唤醒过程// 只是把当前节点更新为新的头节点setHeadAndPropagate(node, r);p.next null; // help GCreturn;}}// 根据前驱节点状态判断是否应该阻塞if (shouldParkAfterFailedAcquire(p, node) // 如果当前线程是被中断唤醒的,那么直接抛出中断异常parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {// 将当前节点状态设置为取消状态cancelAcquire(node);throw t;}}与互斥模式不同的一点在于如果共享模式下线程被唤醒后成功获取到资源当前线程会接着唤醒其后继节点让其继续获取资源后继节点在发现还有资源的情况下继续唤醒其后继节点直到资源不足停止继续往后唤醒整个过程实际是一个链式唤醒的过程。 下面我们进入释放共享资源小节来看看链式唤醒的整个过程是如何发生的。 释放共享资源 releaseShared方法负责完成对资源的释放释放完资源后唤醒等待中的线程: public final boolean releaseShared(int arg) {// 资源怎样算释放成功这是由子类决定的 if (tryReleaseShared(arg)) {// 如果资源释放成功,则进入链式唤醒过程doReleaseShared();return true;}return false;}tryReleaseShared方法中采用cas重试确保资源数原子性累加成功: protected final boolean tryReleaseShared(int releases) {// 因为可能存在多个线程同时尝试归还资源,因此这里为了确保能够原子性累加成功// 采用cas重试机制实现for (;;) {int current getState();int next current releases;if (next current) // overflowthrow new Error(Maximum permit count exceeded);if (compareAndSetState(current, next))return true;}}doReleaseShared方法开启链式唤醒过程: private void doReleaseShared() {for (;;) {Node h head;// 确保资源队列不为空if (h ! null h ! tail) {int ws h.waitStatus;// 确保头结点存在后继节点if (ws Node.SIGNAL) {// 设置头结点的状态值为0,如果cas设置失败,说明可能存在多个线程同时尝试开启链式唤醒if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck cases// 唤醒头结点的后继节点unparkSuccessor(h);}// 存在并发唤醒的情况处理 --- 具体可以看唤醒丢失小节else if (ws 0 !h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}// 头结点没有变动的情况下,说明没有竞争唤醒问题,则跳出循环// 否则尝试继续唤醒变动后的头结点的下一个后继节点if (h head) // loop if head changedbreak;}}被唤醒的线程会在doAcquireSharedInterruptibly方法的for循环中继续尝试去获取资源如果获取成功并且还有剩余资源则唤醒自己的后继节点: private void setHeadAndPropagate(Node node, int propagate) {// 更新当前成功获取资源的节点为新的头结点Node h head;setHead(node);// propagate代表剩余资源剩余数量,只有在还有剩余资源的情况下才会去唤醒自己的后继节点// h保留的是旧的头结点如果旧的头结点状态值为PROPAGATE则再尝试唤醒其后继节点去获取一下资源试试if (propagate 0 || h null || h.waitStatus 0 ||(h head) null || h.waitStatus 0) {Node s node.next;// 如果当前节点是链表尾节点或者后继节点是共享类型节点则尝试进行唤醒if (s null || s.isShared())doReleaseShared();}}如果被唤醒的线程获取资源失败则会进入shouldParkAfterFailedAcquire将头结点状态由0或者PROPAGATE更新为SIGNAL然后自己再尝试获取一次资源: 如果不考虑并发唤醒的问题那么整个链式唤醒过程如下图所示: 唤醒丢失问题 并发唤醒会使得整个问题复杂起来为了解决这个问题AQS引入了PROPAGATE状态在JDK 6之前是没有这个状态的当时的Semphore实现存在一个bug如下图所示: bug: https://bugs.openjdk.java.net/browse/JDK-6801020fix: https://github.com/openjdk/jdk8u/commit/b63d6d68d93ebc34f8b4091a752eba86ff575fc2 t3 调用 tryReleaseShared 方法释放 1 个许可然后调用 unparkSuccessor 方法将 head.waitStatus 由 SIGNAL 改为 0并唤醒后继节点 t1 后退出t1 被 t3 唤醒调用 tryAcquireShared 方法获取到许可并返回 0此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 headt4 调用 tryReleaseShared 方法释放 1 个许可因为 head 未改变因此 head.waitStatus 仍为 0这导致 t4 退出不会继续调用 unparkSuccessor 方法唤醒后继节点 t2t1 继续调用 setHeadAndPropagate 方法首先将自己设置为新 head然后因为 tryAcquireShared 方法返回 0 导致 t1 退出不会继续调用 unparkSuccessor 方法唤醒后继节点 t2 至此t2 永远不会被唤醒问题产生。 上述bug产生的原因就是因为共享锁的获取和释放在同一时刻很可能会有多条线程并发执行这就导致在这个过程中可能会产生这种 waitStatus 为 0 的中间状态而AQS会将waitStatus状态值为0看做是资源队列为空的信号因此此时产生的尴尬问题就是明明还有资源但是确无法唤醒等待队列中的线程因此通过引入 PROPAGATE 状态来解决这个问题。 为了解决这个问题为头结点引入了PROPAGATE状态此时我们再来看引入PROPAGATE状态后的流程是怎样的: t3 调用 tryReleaseShared 方法释放 1 个许可然后调用 doReleaseShared 方法将 head.waitStatus 由 SIGNAL 改为 0并唤醒后继节点 t1 后退出t1 被 t3 唤醒调用 tryAcquireShared 方法获取到许可并返回 0此时还未调用 setHeadAndPropagate 方法中的 setHead 方法将自己设置为新 headt4 调用 tryReleaseShared 方法释放 1 个许可因为 head 未改变因此 head.waitStatus 仍为 0然后调用 doReleaseShared 方法将 head.waitStatus 由 0 改为 PROPAGATE 后 t4 退出t1 继续调用 setHeadAndPropagate 方法首先将自己设置为新 head因为此时旧 head.waitStatus 为 PROPAGATE 且同步队列中 t1 还有后继节点 t2所以继续调用 doReleaseShared 方法将 head.waitStatus 由 SIGNAL 改为 0并唤醒后继节点 t2 后退出 后继节点 t2 被唤醒问题解决。 引入 PROPAGATE 状态还有一个好处加速唤醒后继节点。 doReleaseShared 方法中有这个条件判断 if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;如果没有 PROPAGATE 状态当多条线程同时运行到这里后可能就直接退出了虽然这时有个线程正在调用 unparkSuccessor 方法去唤醒后继节点但唤醒后的线程也需要等到获取到锁且成为头节点后才能调用 doReleaseShared 方法再去唤醒后继节点。 当并发大时在这个过程中很有可能会有新节点入队并满足唤醒条件所以有了 PROPAGATE 状态当多条线程同时运行到这里后CAS 失败后的线程可以再次去循环判断能否唤醒后继节点如果满足唤醒条件就去唤醒。 毕竟调用 doReleaseShared 方法越多、越早就越有可能更快的唤醒后继节点。 线程t6先入队阻塞然后线程t4和t5释放资源如果没有上面那句判断那么就不会将头结点设置为PROPAGATE状态也就不会去唤醒t2和t6了。 小总结 AQS共享模式的实现到此就讲解完毕了整体而言除了链式唤醒那块比较难搞之外其他部分还是很容易理解的下面简单总结一下共享模式下抢夺资源和释放资源的步骤首先是抢夺资源的步骤: 尝试获取资源获取失败则入队阻塞获取成功直接返回 其次是释放锁步骤: 释放资源累加资源计数判断头结点是否存在后继结点如果存在则设置当前头结点状态为0,然后唤醒其后继结点当前线程结束工作被唤醒的线程尝试去获取资源如果获取失败则更正头结点状态为SIGNAL然后自己再尝试获取一次还是不行就把自己挂起如果获取成功设置当前Node为新的头结点然后判断是否还有剩余资源如果有的话就唤醒自己的后继节点后继节点重复链式唤醒步骤直到资源不足或者等待队列为空 链式唤醒过程的讲述不包含并发唤醒处理步骤这一块比较复杂大家可以参考唤醒丢失小节自行总结。 共享模式的实现中用到的并发技巧还是CAS重试全程并没有使用到睡眠锁。 混合模式 当AQS运行在混合模式下时此时Node节点的结构如下图所示: static final class Node {static final Node SHARED new Node();static final Node EXCLUSIVE null;static final int CANCELLED 1;static final int SIGNAL -1;static final int PROPAGATE -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared SHARED;}// 省去混合模式下使用不到的相关属性...}混合模式的实现这里以ReentrantReadWriteLock读写锁为例进行说明。 读写锁场景下我们主要需要关心读写互斥的问题首先读读肯定是并发的但是读写写写操作必须是互斥的还有几个问题我们也需要考虑一下: 线程1持有读锁等待队列中有线程2在等待写锁如果此时线程3来请求读锁请问线程3时需要等待还是直接获取读锁呢线程1持有读锁此时线程1又想要去获取写锁请问在此场景下锁升级操作是否被允许发生呢线程1持有写锁此时线程1又想要去获取读锁请问在此场景下锁降级操作是否被允许发生呢 这里我先回答第2和第3个问题: 读锁不能直接升级为写锁而写锁可以降级为读锁为什么读锁不能升级为写锁 当多个线程同时持有读锁时它们可以并发地读取数据因为读操作不会影响数据的一致性。然而如果读锁能够直接升级为写锁那么问题就会变得复杂。如果一个线程在持有读锁的情况下升级为写锁那么其他线程可能正在读取数据升级为写锁后其他线程可能会在不知情的情况下访问到无效或不一致的数据从而破坏了读锁的目的。因此为了保证数据的一致性一般情况下读锁不能直接升级为写锁。 为什么写锁可以降级为读锁 写锁可以降级为读锁是因为写锁保证了数据的一致性而降级为读锁不会引起数据一致性的问题。当一个线程持有写锁时它是独占访问资源的其他线程无法读取或写入数据。如果此时没有其他写操作需要进行写锁可以降级为读锁允许其他线程并发地读取数据因为读操作不会影响数据的一致性。 关于第一个问题的答案我们将在接下来的代码解析环节揭晓。 获取写锁 我们首先来看读写锁实现中获取写锁的方法实现: public final void acquire(int arg) {if (!tryAcquire(arg) acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}AQS提供的acquire模版方法就不多讲了大家可以自行回忆整个过程其中我们来看看读写锁是如何判断写锁是否获取成功的这个过程: protected final boolean tryAcquire(int acquires) {Thread current Thread.currentThread();// state的前半部分记录读锁数量,后半部分记录写锁数量int c getState();int w exclusiveCount(c);// 如果存在读锁或者写锁,c!0的条件都会满足if (c ! 0) {// 只有存在写锁并且是当前线程持有的写锁下面这个条件才不会满足// 否则说明当前写锁获取失败if (w 0 || current ! getExclusiveOwnerThread())return false;// 如果是当前线程自己的写锁重入,则判断是否重入次数过多 if (w exclusiveCount(acquires) MAX_COUNT)throw new Error(Maximum lock count exceeded);// Reentrant acquire// 累加重入次数setState(c acquires);return true;}// 此时不存在任何锁,那么为什么还要判断当前写锁是否应该阻塞呢// 因为如果多个线程同时执行到当前逻辑,那么可能已经有线程已经抢到锁了,然后某几个线程已经入队等候了// 当前线程执行到此处时发生上下文切换,再次切换回来执行时,正好碰上锁被释放// 判断逻辑: 当前是公平还是非公平锁,如果是公平锁,则看是否有人排队等候锁,否则该函数永远返回falseif (writerShouldBlock() ||// cas尝试去获取锁,如果失败了则返回false!compareAndSetState(c, c acquires))return false;// 抢锁成功,设置锁的占有线程为当前线程 setExclusiveOwnerThread(current);return true;}如果当前存在锁并且不属于自身写锁可重入场景则表明抢锁失败当前线程需要入队阻塞。 如果不存在锁则CAS尝试抢锁失败了还是入队阻塞。 关于判断写锁是否应该阻塞的公平和非公平实现如下: static final class NonfairSync extends Sync {final boolean writerShouldBlock() {return false; // writers can always barge}...}static final class FairSync extends Sync {final boolean writerShouldBlock() {return hasQueuedPredecessors();}...}释放写锁 读写锁实现中释放写锁的代码如下所示: public final boolean release(int arg) {if (tryRelease(arg)) {Node h head;if (h ! null h.waitStatus ! 0)unparkSuccessor(h);return true;}return false;}AQS提供的release模版方法这里不再多说下面来看看tryRelease方法是如何完成写锁的释放流程的: protected final boolean tryRelease(int releases) {// 要确保持有锁的线程是自己if (!isHeldExclusively())throw new IllegalMonitorStateException();int nextc getState() - releases;// 判断写锁计数递减后是否为0boolean free exclusiveCount(nextc) 0;// 不为0说明存在锁重入if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}获取读锁 读写锁实现中获取读锁的代码如下所示: public final void acquireShared(int arg) {if (tryAcquireShared(arg) 0)doAcquireShared(arg);}AQS提供的acquireShared模版方法这里不再多说下面来看看tryAcquireShared方法是如何完成读锁的获取流程的: protected final int tryAcquireShared(int unused) {Thread current Thread.currentThread();int c getState();// 考虑读锁升级写锁场景: 如果当前存在写锁并且不是当前线程只有的写锁,那么获取读锁失败if (exclusiveCount(c) ! 0 getExclusiveOwnerThread() ! current)return -1;// 获取读锁个数 int r sharedCount(c);// 如果当前读操作需要被阻塞,那么需要到fullTryAcquireShared函数中检查是否真的需要阻塞if (!readerShouldBlock() // 确保读锁数量没有达到上限r MAX_COUNT // 尝试累加全局读锁计数compareAndSetState(c, c SHARED_UNIT)) {// 首个获取读锁的线程单独记录if (r 0) {firstReader current;firstReaderHoldCount 1;} else if (firstReader current) {firstReaderHoldCount;} else {// 其他获取读锁的线程使用ThreadLocal记录其获取的读锁数量HoldCounter rh cachedHoldCounter;if (rh null ||rh.tid ! LockSupport.getThreadId(current))// cachedHoldCounter记录最新一次获取读锁的线程的HoldCountercachedHoldCounter rh readHolds.get();else if (rh.count 0)readHolds.set(rh);rh.count;}return 1;}// 这里存在三种情况会进入下面这个方法的逻辑:// 1. readerShouldBlock方法判断当前读锁获取应该阻塞等待// 2. 读锁获取数量达到最大值// 3. 尝试原子性累加全局读锁计数失败return fullTryAcquireShared(current);}fullTryAcquireShared方法负责处理以下几种场景: 如果发现写锁不为空,要确保持有写锁的线程始终是当前线程自己,否则获取读锁失败返回 - 1 表示当前线程需要阻塞readerShouldBlock()方法返回true时表示当前线程需要入队阻塞但是如果当前线程已经持有写锁或读锁它仍然可以继续执行如果当前线程不持有锁那么返回 -1 表示当前线程需要阻塞检查读锁获取的总次数是否超过最大限制cas原子性的累加读锁计数器如果失败则重复以上检查过程最后再次尝试cas原子累加 final int fullTryAcquireShared(Thread current) { HoldCounter rh null;for (;;) {//阶段1: 前置检查 int c getState();// 如果发现写锁不为空,要确保持有写锁的线程始终是当前线程自己,否则获取读锁失败if (exclusiveCount(c) ! 0) {if (getExclusiveOwnerThread() ! current)return -1;// 判断当前线程是否需要入队等待} else if (readerShouldBlock()) {// 如果当前线程还没有持有读锁,那么情况当前线程的threadLocal本地缓存中记录的读取计数器if (firstReader ! current) {if (rh null) {rh cachedHoldCounter;if (rh null ||rh.tid ! LockSupport.getThreadId(current)) {rh readHolds.get();if (rh.count 0)readHolds.remove();}}// 如果当前线程不持有任何锁,则返回-1表示当前线程需要入队阻塞// 如果当前线程持有读锁,那么即便readerShouldBlock返回true,这里也不会阻塞// 为了防止死锁的发生if (rh.count 0)return -1;}}// 如果读锁获取的总次数达到最大限制,则抛出异常if (sharedCount(c) MAX_COUNT)throw new Error(Maximum lock count exceeded);//阶段2: 累加计数器// 累加全局读锁计数 if (compareAndSetState(c, c SHARED_UNIT)) {// 说明当前线程是第一个获取读锁的线程if (sharedCount(c) 0) {firstReader current;firstReaderHoldCount 1;} else if (firstReader current) {// 处理获取读锁的首位线程的读锁重入firstReaderHoldCount;} else {// 普通情况处理if (rh null)// 先获取最近一次获取读锁的线程的HoldCounterrh cachedHoldCounter;// 如果最近一次获取读锁的线程记录不存在,或者其记录的不是当前线程 if (rh null ||rh.tid ! LockSupport.getThreadId(current))// 从线程自己的本地缓存获取自己的HoldCounterrh readHolds.get();// 如果最近一次获取读锁的线程就是当前线程,那么累加当前线程本地读锁计数器 else if (rh.count 0)readHolds.set(rh);rh.count;// 记录最近一次获取读锁的线程的读锁计数器cachedHoldCounter rh; // cache for release}// 返回1表示获取读锁成功return 1;}}}fullTryAcquireShared方法大体分为两个阶段前置检查和累加计数其中前置检查阶段需要特别注意一点: 即使 readerShouldBlock 返回 true如果当前线程已经持有写锁或读锁它仍然可以继续执行readerShouldBlock()那段if…else中没有检查写锁是否为0,是因为写锁的检查已经在第一个if块中进行了检查这种设计的主要目的是避免因为同一个线程在持有锁时被阻塞导致整个程序出现死锁的情况。如果允许同一个线程在持有锁时被阻塞那么在某些情况下可能会导致线程间的循环依赖从而引发死锁。 读锁是否应该阻塞 关于读锁是否应该阻塞这个问题其公平版本实现还是判断等待队列中是否存在线程正在等待这一点和之前一样关键是其非公平版本实现判断逻辑: 如果等待队列头结点的后继节点是独占模式也就是当前阻塞线程期望获取的是写锁那么当前读锁需要阻塞否则当前读锁无需阻塞 static final class NonfairSync extends Sync {final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}static final class FairSync extends Sync {final boolean readerShouldBlock() {return hasQueuedPredecessors();}}final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h head) ! null (s h.next) ! null !s.isShared() s.thread ! null;}关于读锁的非公平版本实现主要是考虑到写锁的饥饿性问题: Writer Starvation写线程饥饿这是指在一个读写锁的多线程环境中由于读线程频繁获取锁写线程可能无法获取到写锁从而长时间被阻塞。这可能导致写线程无法完成关键的操作从而影响程序的性能和响应性。启发式方法为了避免写线程饥饿这段代码注释中提到了一种启发式方法。在这个方法中当一个线程出现在队列的队首appears to be head of queue且如果这个线程是一个正在等待写锁的线程waiting writer则当前线程会被阻塞。概率效应需要注意的是这只是一个概率性的效应。这是因为即使出现一个等待写锁的线程在队首一个新的读线程不会被阻塞如果在它之前有其他的启用的读线程还没有从队列中出来。这是为了在一定程度上平衡读写线程的获取锁的机会避免写线程长时间被阻塞。 大家可以回想一下本篇开始提出的问题一相信大家已经都知晓答案了。 释放读锁 读写锁实现中释放读锁的代码如下所示: public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}AQS提供的releaseShared模版方法这里不再多说下面来看看tryReleaseShared方法是如何完成读锁的释放流程的: protected final boolean tryReleaseShared(int unused) {Thread current Thread.currentThread();// 如果当前线程是第一个获取读锁的线程if (firstReader current) {// assert firstReaderHoldCount 0;// 如果读锁没有出现可重入获取,直接将firstReader置空if (firstReaderHoldCount 1)firstReader null;else// 递减读锁获取次数,等到为0时,才算读锁被真正释放firstReaderHoldCount--;} else {// 通过缓存拿到最近一次获取读锁的线程的HoldCounter计数HoldCounter rh cachedHoldCounter;// 如果缓存中没有,或者缓存记录的不是当前线程的读计数器,那么从当前线程本地缓存中获取if (rh null ||rh.tid ! LockSupport.getThreadId(current))rh readHolds.get();// 如果读锁获取计数器1则直接释放当前读锁int count rh.count;if (count 1) {readHolds.remove();if (count 0)throw unmatchedUnlockException();}// 否则递减计数器--rh.count;}// 通过cas加重试确保全局计数器递减成功for (;;) {int c getState();int nextc c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.// nextc 0 条件满足说明读锁和写锁计数器都为0return nextc 0;}}小总结 关于混合模式的实现其本身结合使用了互斥模式和共享模式因此锁队列中既存在类型为独占模式的Node也存在类型为共享模式的Node,如下图所示: 获取写锁时如果当前存在锁并且不属于自身写锁可重入场景则抢锁失败当前线程入队阻塞Node节点类型为独占模式如果不存在锁则CAS尝试抢锁失败了还是入队阻塞。释放写锁时判断锁计数递减后是否为0如果是则说明锁释放成功然后唤醒当前头节点的后继节点否则跳过唤醒阶段获取读锁时,分为以下几类场景: 存在写锁但是写锁不是自己的则获取读锁失败返回 -1 当前线程需要阻塞存在写锁 写锁时自己的此时属于写锁降级为读锁的场景累加读锁计数然后返回1表示获取读锁成功不存在写锁此时readerShouldBlock方法返回true,表示当前线程需要入队阻塞但是如果当前线程此时已经持有读锁它仍然可以继续执行否则返回-1表示当前线程需要阻塞如果读锁获取的总次数超过最大限制抛出异常cas原子性的累加读锁计数器如果失败则重复以上检查过程如果没问题继续尝试cas原子累加计数器 释放读锁时先递减当前线程本地缓存的计数器值然后再递减全局锁计数器的值只有当全局锁计数器递减后的值为0时才表明当前锁时真正被完全释放掉了 之前在共享模式篇说过共享锁的释放流程走的是链式唤醒流程而在混合模式下如果链式唤醒过程中遇到了独占节点也会提前终止链式唤醒流程: private void setHeadAndPropagate(Node node, int propagate) {Node h head;setHead(node);if (propagate 0 || h null || h.waitStatus 0 ||(h head) null || h.waitStatus 0) {Node s node.next;// 唤醒的前提是后继节点必须是共享节点,而不是独占节点if (s null || s.isShared())doReleaseShared();}}栅栏模式 当AQS处在栅栏模式下时此时state表示剩余未到达栅栏处的线程数先到达栅栏处的线程需要等待直到剩余线程都达到栅栏 , 当state0时说明所有线程都到达栅栏处此时打开栅栏即唤醒所有线程继续执行 当AQS运行在栅栏模式下时此时Node节点的结构如下所示: static final class Node {static final Node SHARED new Node();static final Node EXCLUSIVE null;static final int CANCELLED 1;static final int SIGNAL -1;static final int PROPAGATE -3;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared SHARED;}// 省去栅栏模式下使用不到的相关属性...}栅栏模式这里以CountDownLatch为例进行说明,CountDownLatch的构造函数需要传入一个count值表明参与本轮任务的线程数量: public CountDownLatch(int count) {if (count 0) throw new IllegalArgumentException(count 0);this.sync new Sync(count);}严格来说CyclicBarrier才是栅栏模式的真正实现CountDownLatch只是栅栏模式的简化版本和栅栏模式稍有区别在于CountDownLatch只有一轮循环也就是所有线程到达栅栏处后就结束所有线程的执行而CyclicBarrier会在本轮循环结束后重置计数器开启下一轮循环。 等待 主线程调用await方法等待所有线程完成任务到达栅栏处: public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) 0)doAcquireSharedInterruptibly(arg);}acquireSharedInterruptibly这个模版方法就不多说了tryAcquireShared方法实现也很简单当state递减为0时表明所有线程都已经到达栅栏处了本轮结束主线程可以继续往下执行了 protected int tryAcquireShared(int acquires) {return (getState() 0) ? 1 : -1;}递减计数 当前线程完成自己的任务后递减计数: public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}releaseShared模版方法这里就不多说了tryReleaseShared的实现也很简单通过cas配合重试递减计数器的值如果计数器的值被递减到0了说明当前线程是最后一个到达栅栏处的接下来就可以唤醒等待中的主线程了: protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c getState();if (c 0)return false;int nextc c - 1;if (compareAndSetState(c, nextc))return nextc 0;}}条件变量模式 当AQS运行在条件变量模式下时此时Node节点的结构如下所示: static final class Node {static final Node SHARED new Node();static final Node EXCLUSIVE null;static final int CANCELLED 1;static final int SIGNAL -1;static final int PROPAGATE -3;static final int CONDITION -2;volatile Node prev;volatile Node next;volatile int waitStatus;// 此处变量原名nextWaiter,该变量原本只应该应用在条件变量模式下,但是AQS为了节省一个变量,进行了复用// 所以这里改名方便下面理解Node shared;final boolean isShared() {return shared SHARED;}// 如果当前Node处在条件队列中,那么该属性链接当前节点在队列中的下一个节点Node nextWaiter;// 省去共享模式下使用不到的相关属性...}条件变量模式大家需要注意两点: 条件变量需要互斥锁保护此模式下存在两个队列一个是锁队列一个是条件队列如果存在多个条件变量那么就会有多个条件队列 这两套队列复用一套Node结构 此处还未Node节点新增一个状态CONDITION此状态用于描述处于条件队列中的节点。 这里以ReentrantLock中的ConditionObject实现来看看条件变量在AQS中是怎么玩的。 等待条件成立 await方法为当前线程创建node节点并加入对应的条件队列中等待直到被signal唤醒或者中断唤醒才会结束等待状态: public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 为当前线程创建一个Node节点然后尾插到条件队列尾部 Node node addConditionWaiter();// 因为当前线程需要在条件队列中挂起等待所以挂起前需要完全释放掉当前线程持有的锁// 返回值是阻塞前持有的锁计数int savedState fullyRelease(node);int interruptMode 0;// 当条件成立时,会将当前node节点从条件队列移入锁队列while (!isOnSyncQueue(node)) {// 挂起当前线程,等待条件成立LockSupport.park(this);// 被唤醒后检查是否是被中断唤醒的,如果是被中断唤醒的并且转移节点到锁队列成功,返回-1,并跳出循环等待// 如果是被中断唤醒的但是cas转移节点到锁队列失败了此时返回1并跳出循环等待if ((interruptMode checkInterruptWhileWaiting(node)) ! 0)break;}// 当前节点已经转移到锁队尾排队等候锁了,此时调用acquireQueued把当前线程重新挂起// 等待被唤醒后抢锁,如果失败,继续挂起,直到抢锁成功为止// acquireQueued方法返回值表示等待锁期间是否发生了中断,如果发生了则记录中断标记为REINTERRUPT// 如果阻塞前线程发生了可重入并重入了3次那么此处抢锁要确保初始锁计数为3if (acquireQueued(node, savedState) interruptMode ! THROW_IE)interruptMode REINTERRUPT;// 条件满足,当前线程被唤醒,并且唤醒后抢锁成功// 先清理一下队列中处于取消等待状态的节点 if (node.nextWaiter ! null) // clean up if cancelledunlinkCancelledWaiters();// 在条件队列上等待期间或者锁队列上等待期间发生了中断则对中断进行分类处理if (interruptMode ! 0)reportInterruptAfterWait(interruptMode);}addConditionWaiter方法负责为当前线程创建一个Node节点然后尾插到条件队列尾部: private Node addConditionWaiter() {// 确保当前线程持有保护当前条件变量的互斥锁if (!isHeldExclusively())throw new IllegalMonitorStateException();// 清理一下条件队列中处于取消状态的Node节点Node t lastWaiter;// If lastWaiter is cancelled, clean out.if (t ! null t.waitStatus ! Node.CONDITION) {// 负责遍历整个条件队列链表,删除掉所有处于CANCEL状态的节点unlinkCancelledWaiters();t lastWaiter;}// 为即将入队的当前线程创建一个Node节点尾插到当前条件队列中Node node new Node(Node.CONDITION);if (t null)firstWaiter node;elset.nextWaiter node;lastWaiter node;return node;}fullyRelease方法负责完成释放掉当前线程持有的所有锁: final int fullyRelease(Node node) {try {// 返回先前持有的锁计数int savedState getState();// 释放掉当前线程持有的锁如果是可重入场景,也是全部释放掉if (release(savedState))return savedState;throw new IllegalMonitorStateException();} catch (Throwable t) {node.waitStatus Node.CANCELLED;throw t;}}checkInterruptWhileWaiting方法在被唤醒后检查是否是被中断唤醒的如果是说明条件满足被signal唤醒了然后调用transferAfterCancelledWait将当前线程从条件队列转移到锁队列等候: private int checkInterruptWhileWaiting(Node node) {return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :0;}final boolean transferAfterCancelledWait(Node node) {// 将当前节点状态值设置为0if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {// 放入到锁队列尾部enq(node);// 此处返回true,表示是由于中断唤醒导致当前线程由条件队列转移到了锁队列中return true;}/** If we lost out to a signal(), then we cant proceed* until it finishes its enq(). Cancelling during an* incomplete transfer is both rare and transient, so just* spin.*/// 如果上面cas失败,说明SIGNAL操作和中断同时发生此时被中断唤醒的线程会不断轮询直到SIGNAL操作将节点成功转移到锁队列while (!isOnSyncQueue(node))Thread.yield();// 此处返回false,此时虽然发生了中断但是最终不是因为中断返回被唤醒而是signal操作率先完成了唤醒return false;}reportInterruptAfterWait方法负责判断当前是应该发生一次自我打断还是应该直接抛出中断异常: private void reportInterruptAfterWait(int interruptMode)throws InterruptedException {if (interruptMode THROW_IE)throw new InterruptedException();else if (interruptMode REINTERRUPT)selfInterrupt();}条件满足,唤醒等待的节点 当条件满足时调用signal方法唤醒条件队列中第一个节点对应的线程: public final void signal() {// 确保当前线程持有保护条件变量的互斥锁if (!isHeldExclusively())throw new IllegalMonitorStateException();// 唤醒条件队列中等待的所有线程Node first firstWaiter;if (first ! null)doSignal(first);}doSignal方法负责遍历条件队列中每个节点每遍历到一个节点先检查是否是被CANCEL了的节点如果是则跳过继续遍历下一个节点如果不是则将当前节点从条件队列转移到锁队列中并唤醒对应的线程然后结束返回: private void doSignal(Node first) {do {// 当前节点移出条件队列if ( (firstWaiter first.nextWaiter) null)lastWaiter null;first.nextWaiter null;} while (!transferForSignal(first) (first firstWaiter) ! null);}transferForSignal方法负责将当前节点加入锁队列中同时恢复挂起了的线程: final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/// 如果cas失败,说明当前Node已经被取消了if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 将当前节点加入锁队列尾部--返回旧的尾节点Node p enq(node);int ws p.waitStatus;// 将旧的尾节点的状态值设置为SIGNAL因为其存在新的后继节点需要唤醒if (ws 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))// 恢复挂起的线程LockSupport.unpark(node.thread);return true;}小总结 条件变量的实现这块需要将await和signal两部分连起来看才能看明白这里我给出一副流程图来帮助大家理解等待唤醒过程首先来看一下await的过程: 虽然await方法有很多代码但是大部分代码都服务于唤醒后的处理操作。 下面来看一下signal的整个过程: 后续的流程就是互斥模式下阻塞等待锁的流程了当t3线程抢到锁后会判断在锁队列上阻塞等待期间是否发生了中断如果是则会补充一次自我中断。 上面描述的是通过signal唤醒等待中的线程还有一种是线程阻塞过程中被打断了此时可能会发生如下两种情况: 线程被中断唤醒后尝试CAS更新节点状态从CONDITION到0成功此时interruptMode会被设置THROW_IE标记 当后续线程成功获取到锁被唤醒后发现interruptMode被设置了THROW_IE标记会抛出中断异常表明此次返回并非条件为真返回而是因为中断唤醒返回。 线程被中断唤醒后尝试CAS更新节点状态从CONDITION到0失败说明SIGNAL操作和中断同时发生此时被中断唤醒的线程会不断轮询直到SIGNAL操作将节点成功转移到锁队列 后续的流程就是互斥模式下阻塞等待锁的流程了当t3线程抢到锁后会补充一次自我中断。此时虽然发生了中断但是最终不是因为中断返回被唤醒而是signal操作率先完成了唤醒但是等待期间发生了中断所以补充了一次自我中断。 小结 Java AQS抽象队列同步器的大致设计思路也就讲述的差不多了后半部分由于写作时间比较紧张所以可能越到后面讲的就越草率了如果有没看懂的地方或者发现笔者写作错误可以在评论区指出或者私信与我讨论。
http://www.dnsts.com.cn/news/76009.html

相关文章:

  • 网站的产品上传图片网站制作成功案例
  • 竹妃怎么在公众号里做网站互动平台领手机
  • 最好记得网站域名南京建设网站公司
  • 易语言可以做网站后端淘宝网站的建设目标
  • 自己建立网站后怎么做淘客张家明做网站
  • 工商网站官网入口海口建设公司网站
  • 网站首页上海网站建设公司设计网页公司哪里好
  • 国外优秀的html5网站wordpress多站点demo
  • html5高端网站建设织梦模板下载wordpress 301 插件
  • 网站架构包括哪些我是怎么做网站架构的
  • 企业网站建设销售话术网页打不开但是微信和qq都可以上
  • 网站制作的服务机构做乡镇网站
  • 深圳商城网站开发什么网站可以做试卷
  • 手机网站制作明细报价表我的网站百度找不到了
  • 芜湖网站开发公司小型企业网站设计
  • 知名的建站公司win7不能运行wordpress
  • 电商型企业网站建设网站建设网站建设教程
  • 网站选项怎么做Wordpress 百度云存储
  • 专业的建站公司服务wordpress每页不显示文章
  • 珠海品牌网站建设深圳龙华新区属于什么区
  • 网上商城怎么推广企业网站seo报价
  • 在线搭建网站网站建设的可研设计报告
  • 做网站后有人抢注关键词免费考研论坛
  • 菜单网站图片素材建设网站 宣传平台
  • 网站服务内容 备案qq刷赞网站推广全网
  • 省级建设网站建立个人网站服务器
  • 网站建设 图片问题找能做网站的
  • 湖北手机网站建设徐州有哪些做网站
  • 公司想做网站费用要多少钱怀化人社网站
  • 品牌电商网站设计杭州黑马程序员培训机构怎么样