AQS: 抽象的队列同步器:
是JUC的基石,是构建锁(ReentrantLock和ReentrantReadWriteLock)和其他同步器组件(Semaphore,CountDownLatch,CyclicBarriar)的基础框架;
抽象: 我们要使用AQS,需要继承AQS并重写诸如
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
等抽象方法;
队列: AQS抽象类中维护了一个CLH的变种,即双向的先进先出的队列,用head,tail记录队首和队尾元素,元素的类型为Node,将暂时获取不到锁的线程封装为Node节点加入到队列中;
同步器: AQS中定义了一个int型的被volatile修饰的变量,可以通过getState,setState,compareAndSetState函数修改其值;线程同步的关键就是对state进行操作,根据state是否属于一个线程,操作state的方式分为独占式和共享式
对于ReentrantLock来说,state可以用来表示获取锁的可重入次数
对于ReentrantReadWriteLock,state的高16位表示获取读锁的次数,低16位表示获取到写锁的线程的可重入次数
对于Semaphore来说,state用来表示当前可用信号的个数
对于CountDownLatch来说,用来表示计数器当前的值
AQS的组成
(红色线代表是内部类)
AQS中还有一个ConditionObject内部类
Node节点
AQS在独占方式
下获取和释放资源使用的方法是:
void acquire(int arg)
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
void acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}
boolean release(int arg)
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
其中tryAcquire和tryRelease方法需要由具体的子类来实现,这里是模板方法的体现
可以发现,AQS中该方法是直接抛出异常的
在共享方式
下获取和释放资源使用的方法为:
void acquireShared(int arg)
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
public final void acquireSharedInterruptibly(int arg)
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
public final boolean releaseShared(int arg)
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
我们知道ReentrantLock具有
可重入
可打断
公平和非公平
条件变量
可超时等特性
可以看到ReentrantLock中维护了Sync,FairSync,NonFairSync3个内部类
以及他们的继承关系Sync继承AQS,FairSync,NonFairSync继承Sync
默认创建的是非公平锁
当调用Reentrant的ock方法时,实际上是调用sync的lock方法,sync进而调用fairSync或者NonFairSync的lock方法
public void lock() {sync.lock();
}
这里可以发现公平锁与非公平锁的第一个区别:
非公平锁: 调用lock时,首先就会执行一次CAS,如果失败,才会进入acquire方法
公平锁: 不会进行CAS抢占锁,直接进入acquire方法
具体tryAcquire方法交给子类来实现
可以明显看出公平锁与非公平锁的第二个区别在于tryAcquire方法**
就在于公平锁在获取同步锁时多了一个限制条件:hasQueuedPredecessors()
hasQueuedPredecessors是判断等待队列中是否存在有效节点
的方法
如果h == t,说明当前队列为空,就直接返回false,那么公平锁发现队列中没有人,那我就去获取锁
如果h != t, 也就是head != tail
------- > 如果h != t 并且 s ==null,说明有一个元素将要作为AQS的第一个节点,返回true
--------> 如果h != t 并且 s != null 以及 s.thread != Thread.currentThread(),则说明,队列里面已经有节点了,但是不是当前线程,则返回true
---------->如果h != t 并且 s != null 以及 s.thread == Thread.currentThread(),说明队列中的节点是当前线程,那么返回false(这代表着锁重入!!!)
所以总结一下: 只有队列为空
或者当前线程节点是AQS中的第一个节点
,则返回false,代表后续可以去获取锁;
否则其他情况都表示,已经有前驱节点,返回true代表后续不能获取锁!!
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;// h != t 时表示队列中有 Nodereturn h != t &&(// (s = h.next) == null 表示队列中还有没有老二(s = h.next) == null ||// 或者队列中老二线程不是此线程s.thread != Thread.currentThread());
}
**`从上面的分析可知:
所谓的可重入的特性,就是这里实现的
) public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
先来看看addWaiter()
private Node addWaiter(Node mode) {// 把当前线程封装为一个节点, mode传入的是EXCLUSIVE代表独占模式Node node = new Node(Thread.currentThread(), mode);//把tail赋值给predNode pred = tail;//刚开始,taii == head == pred == nullif (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) { = node;return node;}}//把当前节点入队enq(node);return node;}
enq方法
private Node enq(final Node node) {//循环for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
第一次循环:
由于t == null 创建一个哨兵节点
注意: 这里没有只是new Node(),创建了一个哨兵节点!!!
,因此进入compareAndSetHead(new Node()
,把head的引用指向哨兵节点,然后tail = head,tail也指向哨兵结点
private final boolean compareAndSetHead(Node update) {return unsafepareAndSwapObject(this, headOffset, null, update);}
示意图如下:
创建好哨兵节点后,进入第二次循环
此时才开始把node节点入队,上面只是设置哨兵节点
node.prev设置node节点的前驱节点为哨兵节点,然后改变tail的引用指向node节点
private final boolean compareAndSetTail(Node expect, Node update) {return unsafepareAndSwapObject(this, tailOffset, expect, update);}
至此,addWaiter方法执行结束,返回node节点,进入acquireQueued()
acquireQueued()方法: acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//得到node的前驱节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {//这里个人感觉不会执行,有大佬知道请评论区指点!!if (failed)cancelAcquire(node);}}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
shouldParkAfterFailedAcquire(p, node)方法: 该方法会尝试让当前的节点的前驱节点的waitStatus设置为-1,
该方法接收的参数: 是node节点
和node节点的前驱节点
这里会涉及到Node节点中waitStatus的状态值,列举出来,共有4种状态,默认是0
/** waitStatus value to indicate thread has cancelled *///表示线程已经取消,就是线程在队列中等待过程中,取消等待(老子不等了!!!)static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needs unparking *///把当前线程置位-1, 表示需要唤醒(unpark)后继节点(线程)static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition *///当前线程因条件不满足,在条件变量(ConditionObject)的等待队列中static final int CONDITION = -2;/*** waitStatus value to indicate the next acquireShared should* unconditionally propagate*/static final int PROPAGATE = -3;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//或得前驱节点的状态int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {//跳过取消的前驱节点node.prev = pred = pred.prev;} while (pred.waitStatus > 0); = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
如果前驱节点的状态已经是-1 .则返回true
如果前驱节点的状态为1 (>0),则表明前驱节点被取消,则跳过前驱节点,直到找到一个状态 <= 0的,然后把前驱的节点的next置位node,然后返回到外层循环
如果前驱节点的状态不是以上两种.,意味着只有waitStatus为0(默认值)和-3的能到这条分支,那么就执行compareAndSetWaitStatus方法,把前驱节点的waitStatus置位-1(代表前驱节点有义务唤醒后继节点)
;
如果以上3种情况都不走,返回false,返回到acquireQueued中继续循环尝试
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {return unsafepareAndSwapInt(node, waitStatusOffset,expect, update);}
注意: park()被打断时,不会清除打断标记,sleep被打断会清除打断标记
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);//打断时,清除打断标记return Thread.interrupted();}
因此这里使用Thread.interrupted()方法,除了判断线程是否被打断外,还会清除打断标记
好,峰回路转,来到最初的地方
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
执行完,acquireQueued方法后,如果返回true,代表线程被打断,则会执行selfInterrupt(),再次将自己中断!!!
所谓的不可打断就是在这里实现的!!!
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
(即如果使用lock方法加锁,在等待过程中被打断,那么他还是会去争抢锁,如果抢锁成功,然后返回打断标记(打断标记是只有抢锁成功才会一同返回!!),然后会自我中断
static void selfInterrupt() {Thread.currentThread().interrupt();}
那么可打断是怎样实现的呢???
当我们调用Reentrant中的lockInterruptibly()方法时,会调用AQS中的acquireInterruptibly
如果其他线程调用了当前线程的interrupt方法,则当前线程会抛出InterruptedException,然后返回
如果没有被打断,那么tryAcquire()尝试获取锁,如果获取锁失败,然后进入doAcquireInterruptibly(arg)方法
在 park 过程中如果被 interrupt 会进入此, 这时候抛出异常, 而不会再次进入 for (;;)
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
public void unlock() {lease(1);}
会调用AQS中的release方法
1.首先tryRelease进行释放锁,由于可重入锁的缘故,只有当state-- 为0,才会返回true,那么此时进入if块
2.判断当前队列是否有结点,并且该节点的waitStatus是否为-1,如果两者都满足,则进入unparkSuccessor唤醒后继节点
public final boolean release(int arg) {// 尝试释放锁if (tryRelease(arg)) {// 队列头节点 unparkNode h = head;if (// 队列不为 nullh != null &&// waitStatus == Node.SIGNAL 才需要 unparkh.waitStatus != 0) {// unpark AQS 中等待的线程,后继节点 进入 ㈡unparkSuccessor(h);}return true;} return false;
}
tryRelease方法同样需要子类实现,但是公平和非公平锁的释放锁是一样!
protected final boolean tryRelease(int releases) {// state--int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}
假设此时状态如下所示:释放锁的线程已经把state置位0,setExclusiveOwnerThread(null)为null,接下来需要唤醒阻塞队列中的线程!
首先获取头结点的waitStatus,尝试重置为0.允许失败
该方法找到队列中离 head 最近的一个 Node(没取消的)
,unpark 恢复其运行,
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处//参数是头结点private void unparkSuccessor(Node node) {// 如果状态为 Node.SIGNAL 尝试重置状态为 0// 不成功也可以int ws = node.waitStatus;if (ws < 0) {compareAndSetWaitStatus(node, ws, 0);}// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的Node s = ;// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点//找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
}
unpark该线程后,线程会回到当初acquireQueued中park的地方继续去获取锁
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//得到node的前驱节点final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;// 还是需要获得锁后, 才能返回打断状态return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {//这里个人感觉不会执行,有大佬知道请评论区指点!!if (failed)cancelAcquire(node);}}private final boolean parkAndCheckInterrupt() {LockSupport.park(this);//打断时,清除打断标记return Thread.interrupted();}
先看一下tryLock方法
public boolean tryLock() {fairTryAcquire(1);}final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
发现: tryLock不会引起当前线程阻塞,获取不到锁就立即返回返回false了
由于tryLock调用的是nonfairTryAcquire(1),所以使用的是非公平策略!!
下面是设置了超时时间的tryLock,如果超时时间到,没有获取到锁,则返回false
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {AcquireNanos(1, Nanos(timeout));
}public final boolean tryAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}/*** The number of nanoseconds for which it is faster to spin* rather than to use timed park. A rough estimate suffices* to improve responsiveness with very short timeouts.*/static final long spinForTimeoutThreshold = 1000L;private boolean doAcquireNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}
}
先回忆一下条件变量的使用:
static ReentrantLock lock = new ReentrantLock();static Condition waitCigaretteQueue = wCondition();static Condition waitbreakfastQueue = wCondition();static volatile boolean hasCigrette = false;static volatile boolean hasBreakfast = false;public static void main(String[] args) {new Thread(() -> {try {lock.lock();while (!hasCigrette) {try {waitCigaretteQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("等到了它的烟");} finally {lock.unlock();}}).start();new Thread(() -> {try {lock.lock();while (!hasBreakfast) {try {waitbreakfastQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("等到了它的早餐");} finally {lock.unlock();}}).start();sleep(1);sendBreakfast();sleep(1);sendCigarette();}private static void sendCigarette() {lock.lock();try {log.debug("送烟来了");hasCigrette = true;waitCigaretteQueue.signal();} finally {lock.unlock();}}private static void sendBreakfast() {lock.lock();try {log.debug("送早餐来了");hasBreakfast = true;waitbreakfastQueue.signal();} finally {lock.unlock();}}输出
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
一个锁对应一个AQS阻塞队列,可以对应多个条件变量,每个条件变量有自己的一个条件队列
开篇我们提到过AQS中有一个内部类ConditionObject
我们在wCondition()的时候,其实是new了一个在AQS内部声明的ConditionObject对象;
需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象,需要由AQS的子类来提供newConditionObject函数
await流程
假设此时Thread-0持有锁,调用await,进入ConditionObject的addConditionWaiter流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (Waiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {//所有已经取消的Node从队列删除unlinkCancelledWaiters();t = lastWaiter;}// 创建一个关联当前线程的新 Node, 添加至队列尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = Waiter = node;lastWaiter = node;return node;
}private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;while (t != null) {Node next = t.nextWaiter;if (t.waitStatus != Node.CONDITION) {t.nextWaiter = null;if (trail == null)firstWaiter = Waiter = next;if (next == null)lastWaiter = trail;}elsetrail = t;t = next;}}
接下来进入AQS的fullyRelease流程,释放同步器上的锁(如果是可重入的,需要都释放掉)
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
}private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = ;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
然后阻塞Thread-0
signal流程
假设 Thread-1 要来唤醒 Thread-0.此时调用signal方法,然后调用doSignal方法
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);
}什么情况transferForSignal(first)会不成功??
即该线程在等待过程中被打断或超时就会放弃对该锁的获取,那就没必要再添加到队列尾部private void doSignal(Node first) {do {//已经是尾结点了if ( (firstWaiter = Waiter) == null)lastWaiter = Waiter = null;// 将等待队列中的 Node 转移至 AQS 队列, 不成功且还有节点则继续循环} while (!transferForSignal(first) &&// 队列还有节点(first = firstWaiter) != null);}
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的
waitStatus 改为 -1
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/如果状态已经不是 Node.CONDITION, 说明被取消了if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/加入 AQS 队列尾部Node p = enq(node);int ws = p.waitStatus;if (// 上一个节点被取消ws > 0 ||// 上一个节点不能设置状态为 Node.SIGNAL!compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {// unpark 取消阻塞, 让线程重新同步状态LockSupport.unpark(node.thread);}return true;}
本文发布于:2024-01-28 06:05:25,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17063931315335.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |