让我们把共享锁与独占锁的函数名都列出来看一下:
独占锁 | 共享锁 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
tryAcquireNanos(int arg, long nanosTimeout) | tryAcquireSharedNanos(int arg, long nanosTimeout) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
acquireInterruptibly(int arg) | acquireSharedInterruptibly(int arg) |
doAcquireInterruptibly(int arg) | doAcquireSharedInterruptibly(int arg) |
doAcquireNanos(int arg, long nanosTimeout) | doAcquireSharedNanos(int arg, long nanosTimeout) |
release(int arg) | releaseShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
- | doReleaseShared() |
从上表可以看到,共享锁的函数是和独占锁是一一对应的,而且大部分只是函数名加了个Shared,从逻辑上看也是很相近的。
而doReleaseShared没有对应到独占锁的方法是因为它的逻辑是包含了unparkSuccessor,是建立在unparkSuccessor之上的,你可以简单地认为,doReleaseShared对应到独占锁的方法是unparkSuccessor。最主要的是,它们的使用时机不同:
abstract static class Sync extends AbstractQueuedSynchronizer {Sync(int permits) {setState(permits);}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}static final class NonfairSync extends Sync {protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}static final class FairSync extends Sync {protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}
共享锁获取锁有一下三种情况:
1、如果返回值大于0,说明获取共享锁成功,并且后续获取也可能获取成功。
2、如果返回值等于0,说明获取共享锁成功,但后续获取可能不会成功。
3、如果返回值小于0,说明获取共享锁失败,无锁可用。
公平版获取锁步骤:
接下来我们谈谈共享锁的tryAcquireShared和独占锁的tryAcquire的不同之处:
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
最后再看tryReleaseShared的实现,也用到了自旋操作,因为完全有可能多个线程同时释放共享锁,同时调用tryReleaseShared,所以需要用自旋保证 共享锁的释放最终能体现到同步器的状态上去。另外,除非int型溢出,那么此函数只可能返回true。
其实共享锁就是考虑可能多个线程可能同时获取锁或同时释放锁,只要是有机会就一定会去尝试。
public final void acquireShared(int arg) {1、尝试获取共享锁if (tryAcquireShared(arg) < 0)2、获取失败进入doAcquireShared(arg);}
接下来看看doAcquireShared方法的逻辑,它对应到独占锁是acquireQueued,除了上面提到的两件事,它们其实差别很少:
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED); //这件事放到里面来了boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) { //前驱是head时,才尝试获得共享锁int r = tryAcquireShared(arg);if (r >= 0) { //获取共享锁成功时,才进行善后操作setHeadAndPropagate(node, r); //独占锁这里调用的是 = null; if (interrupted)selfInterrupt(); //这件事也放到里面来了failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head;1、将当前node设置为head setHead(node);2、注意此时的h还是之前的head,不是nodepropagate 标识state剩余的个数h.waitStatus 此时可能为0、 SIGNAL、 PROPAGATEif (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = ;if (s == null || s.isShared())doReleaseShared();}}private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
setHead函数只是将刚成为将成为head的节点变成一个dummy node。而setHeadAndPropagate里也会调用setHead函数。但是它在一定条件下还可能会调用doReleaseShared,看来这就是单词Propagate的由来了,也就是我们一直说的“如果一个线程刚获取了共享锁,那么在其之后等待的线程也很有可能能够获取到锁”。
1、入参node所代表的线程一定是当前执行的线程,propagate则代表tryAcquireShared的返回值,由于有if (r >= 0)的保证,propagate必定为>=0,这里返回值的意思是:如果>0,说明我这次获取共享锁成功后,还有剩余共享锁可以获取;如果=0,说明我这次获取共享锁成功后,没有剩余共享锁可以获取。
2、Node h = head; setHead(node);执行完这两句,h保存了旧的head,但现在head已经变成node了。
3、h == null和(h = head) == null和s == null是为了防止空指针异常发生的标准写法,但这不代表就一定会发现它们为空的情况。这里的话, h == null和(h = head) == null 是不可能成立,因为只要执行过addWaiter,CHL队列至少也会有一个node存在的;但s == null是可能发生的,比如node已经是队列的最后一个节点。
4、看第一个if的判断:
4.1、如果propagate > 0成立的话,说明还有剩余共享锁可以获取,那么短路后面条件。
4.2、中间穿插一下doReleaseShared的介绍:它不依靠参数,直接在调用中获取head,并在一定情况unparkSuccessor这个head。但注意,unpark head的后继之后,被唤醒的线程可能因为获取不到共享锁而再次阻塞(见上一章的流程分析)。
4.3、如果propagate = 0成立的话,说明没有剩余共享锁可以获取了,按理说不需要唤醒后继的。也就是说,在这种情况下因为h.waitStatus<0 ,调用doReleaseShared,会造成acquire thread 不必要的唤醒。之所以说不必要,是因为唤醒后因为没有共享锁可以获取而再次阻塞了。
4.4、继续看,如果propagate > 0不成立,而h.waitStatus < 0成立。这说明旧head的status<0。但如果你看doReleaseShared的逻辑,会发现在unparkSuccessor之前就会CAS设置head的status为0的,在unparkSuccessor也会进行一次CAS尝试,因为head的status为0代表一种中间状态(head的后继代表的线程已经唤醒,但它还没有做完工作),或者代表head是tail。而这里旧head的status<0,只能是由于doReleaseShared里的compareAndSetWaitStatus(h, 0, Node.PROPAGATE)的操作,而且由于当前执行setHeadAndPropagate的线程只会在最后一句才执行doReleaseShared,所以出现这种情况,一定是因为有另一个线程在调用doReleaseShared才能造成,而这很可能是因为在中间状态时,又有人释放了共享锁。propagate == 0只能代表当时tryAcquireShared后没有共享锁剩余,但之后的时刻很可能又有共享锁释放出来了。
5、继续看,如果propagate > 0不成立,且h.waitStatus < 0不成立,而第二个h.waitStatus < 0成立。注意,第二个h.waitStatus < 0里的h是新head(很可能就是入参node)。第一个h.waitStatus < 0不成立很正常,因为它一般为0(考虑别的线程可能不会那么碰巧读到一个中间状态)。第二个h.waitStatus < 0成立也很正常,因为只要新head不是队尾,那么新head的status肯定是SIGNAL。所以这种情况只会造成不必要的唤醒。
6、看第二个if的判断:
public final boolean releaseShared(int arg) {1、尝试释放锁,if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;
}protected final boolean tryReleaseShared(int releases) {1、也是一个自循环for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
而共享锁的逻辑则直接调用了doReleaseShared,但在获取共享锁成功时,也可能会调用到doReleaseShared。
1、释放锁时,会调用
2、设置setHeadAndPropagate时,也会调用
两处会唤醒后继线程,提高了效率。
private void doReleaseShared() {for (;;) {Node h = head;1、队列至少存在两个节点if (h != null && h != tail) {int ws = h.waitStatus;1.1、waitStatus为SIGNAL,利用CAS设置为0,其实与unparkSuccessor方法配置,该方法也设置了0,只不过在这提前判断if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}1.2、ws为0,设置为PROPAGATE,直到下一次head被后继节点替换这期间都为PROPAGATEelse if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}2、head若发生变化,继续循环,看是否新的head是否可以唤醒后续节点if (h == head) // loop if head changedbreak;}
}
1、逻辑是一个死循环,每次循环中重新读取一次head,然后保存在局部变量h中,再配合if(h == head) break;,这样,循环检测到head没有变化时就会退出循环。注意,head变化一定是因为:acquire thread被唤醒,之后它成功获取锁,然后setHead设置了新head。而且注意,只有通过if(h == head) break;即head不变才能退出循环,不然会执行多次循环。
2、if (h != null && h != tail)判断队列是否 至少有两个node ,如果队列从来没有初始化过(head为null),或者head就是tail,那么中间逻辑直接不走,直接判断head是否变化了。
3、如果队列中有两个或以上个node,那么检查局部变量h的状态:
3.1、如果状态为SIGNAL,说明h的后继是需要被通知的。通过对CAS操作结果取反,将compareAndSetWaitStatus(h, Node.SIGNAL, 0)和unparkSuccessor(h)绑定在了一起。说明了只要head成功得从 SIGNAL修改为0 ,那么head的后继的代表线程肯定会被唤醒了。
3.2、如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数(这样在setHeadAndPropagate可能检测到t.waitStatus<0而进入doReleaseShared方法),这样可能被唤醒线程(因为正常来讲,被唤醒线程的前驱,也就是head会被设置为0的,所以被唤醒线程发现head不为0,就会知道自己应该去唤醒自己的后继了) 检测到。
3.3、如果状态为PROPAGATE,直接判断head是否变化。
4、两个continue保证了进入那两个分支后,只有当CAS操作成功后,才可能去执行if(h == head) break;,才可能退出循环。
5、if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。
setHeadAndPropagate方法总结:
这个函数的难点在于,很可能有多个线程同时在同时运行它。比如你创建了一个Semaphore(0),让N个线程执行acquire(),自然这多个线程都会阻塞在acquire()这里,然后你让另一个线程执行release(N)。
1、此时 释放共享锁的线程,肯定在执行doReleaseShared。
2、由于 上面这个线程的unparkSuccessor,head后继的代表线程也会唤醒,进而执行doReleaseShared。
3、重复第二步,获取共享锁的线程 又会唤醒 新head后继的代表线程。
观察上面过程,有的线程 因为CAS操作失败,或head变化(主要是因为这个),会一直退不出循环。进而,可能会有多个线程都在运行该函数。
参考链接:
1、
2、
本文发布于:2024-01-31 00:37:33,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170663302524039.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |