【ConcurrentHashMap源码详细解析 jdk1.8版本 包括putVal、addCount、fullAddCount和transfer方法详解】

阅读: 评论:0

【ConcurrentHashMap源码详细解析 jdk1.8版本 包括putVal、addCount、fullAddCount和transfer方法详解】

【ConcurrentHashMap源码详细解析 jdk1.8版本 包括putVal、addCount、fullAddCount和transfer方法详解】

目录

一、ConcurrentHashMap介绍

二、ConcurrentHashMap初始化源码解析->难度:简单

 2.1 基本使用

 2.2 ConcurrentHashMap(int initialCapacity)方法

2.3 tableSizeFor(int c)方法

三、put方法源码解析->难度:简单

3.1 什么是CAS

3.2 初始化数组 

3.3 当前key对应的下标没有数据

3.4 当前key对应的下标有数据

四、addCount方法源码解析->难度:超难

4.1 size计算规则和类变量解释

4.2 类变量+1

4.3 数组扩容

五、transfer方法源码解析->难度:超难

5.1 基本知识解释

5.2 扩容源码

总结


前言

        通过写文章分析ConcurrentHashMap源码,提高自己对源码的了解,学习源码中优秀的设计思想,提高抽象思维能力,记录学习过程共同进步。想要提高自身的技术,不能放过阅读ConcurrentHashMap的机会,里面有太多的设计思想值得我们学习。

        再看ConcurrentHashMap源码的时候,要时时刻刻想着这是高并发场景,不要以平常的单线程场景来思考问题,要不然是看不懂的。

        分析有错误的地方,欢迎大佬指出,谢谢。


一、ConcurrentHashMap介绍

        ConcurrentHashMap是java的JUC包下的一个并发安全的集合,jdk1.7和jdk1.8的结构有很大的差异。

        jdk1.8结构为:数组+链表+红黑树,保证线程安全的前提下,提高了性能。有很多优秀的设计思想。

        jdk1.7采用数组+segment,用分段锁来保证线程安全,但是性能也有所下降。


二、ConcurrentHashMap初始化源码解析->难度:简单

 2.1 基本使用

import urrent.ConcurrentHashMap;public class ConcurrentHashMapTest {public static void main(String[] args) {//虽然传入的是17,但是它的容量真的是17吗?ConcurrentHashMap<Object, Object> chm = new ConcurrentHashMap<>(17);chm.put("a","b");}}

 2.2 ConcurrentHashMap(int initialCapacity)方法

/**
* 当前类:ConcurrentHashMap
* private static final int MAXIMUM_CAPACITY = 1 << 30;
*/
public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();// 如果设置的初始容量大于2^29,则设置为2^30,否则tableSizeFor方法计算初始容量大小int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// 集合大小,也可以用来标识扩容临界值,在初始数组initTable方法中可见this.sizeCtl = cap;
}

2.3 tableSizeFor(int c)方法

tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1),这行相当于把传入的容量m乘以1.5,去掉小数,然后求比它大的第一个2^n。初始化容量必须是2^n。

/*** 当前类:ConcurrentHashMap* 17的二进制为00000000 00000000 00000000 00010001* 25的二进制为00000000 00000000 00000000 00011001* 29的二进制为00000000 00000000 00000000 00011101* 31的二进制为00000000 00000000 00000000 00011111* private static final int MAXIMUM_CAPACITY = 1 << 30;* 传入的是17,17*1.5=25.5,去掉小数=25,大于25的2^n为32* 接下来我们验证该方法的返回值* int c = 17+(17>>>1)+1 = 26 (17>>>1) = 00000000 00000000 00000000 00001000=8* int n = c - 1 = 25* n |= n >>> 1; 00000000 00000000 00000000 00011001|00000000 00000000 00000000 00001100* n = 00000000 00000000 00000000 00011101 = 1+4+8+16 = 29* n |= n >>> 2; 00000000 00000000 00000000 00011101|00000000 00000000 00000000 00000111* n = 00000000 00000000 00000000 00011111 = 1+2+4+8+16 = 31* n |= n >>> 4; 00000000 00000000 00000000 00011111|00000000 00000000 00000000 00000001* n = 00000000 00000000 00000000 00011111 = 1+2+4+8+16 = 31* n |= n >>> 8; 00000000 00000000 00000000 00011111|00000000 00000000 00000000 00000000* n = 00000000 00000000 00000000 00011111 = 1+2+4+8+16 = 31* n |= n >>> 16; 00000000 00000000 00000000 00011111|00000000 00000000 00000000 00000000* n = 00000000 00000000 00000000 00011111 = 1+2+4+8+16 = 31* return 31 < 0 ? 1 : (31 > 2^30) ? 2^30 : 31 + 1;* 和预测的一模一样*/
private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

三、put方法源码解析->难度:一般

        put的方法超级长,分为4步讲:

        3.1 什么是CAS

        3.2 初始化数组

        3.3 当前key对应的下标没有数据

        3.4 当前key对应的下标有数据

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;if (tab == null || (n = tab.length) == 0)tab = initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) { = new Node<K,V>(hash, key,value, null);break;}}}else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}addCount(1L, binCount);return null;
}

3.1 什么是CAS

        CAS:Compare And Swap 比较并替换

        基本含义:内存地址V,旧的预期值A,要修改的新值B

        场景:如果当前线程从V拿到x的值为A,进行了自己的一些列业务操作之后,如果此时V的值,还是A,就把V的值改为B,如果不是就不修改。

        一般使用CAS的场景都是个循环,假如这次更新失败,还是再次进入拿到V的值,再进行业务处理,然后再次更新,失败就再次循环。

        看了ConcurrentHashMap的源码,你会对CAS的更加熟悉。

3.2 初始化数组 

        第一个线程进入时,数组肯定为空,if (tab == null || (n = tab.length) == 0)返回true,进入initTable方法

/*** 当前类:ConcurrentHashMap* private static final int DEFAULT_CAPACITY = 16;* transient volatile Node<K,V>[] table; 当前map的元素集合* private transient volatile int sizeCtl; 初始化容量,或者扩容临界值,sizeCtl=32,因为我们 * 使用的是传入初始化容量的构造方法* */
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;// 不一定为true,因为在高并发下,可能其他后面来的线程,可能已经初始化过了,假设没有初始化while ((tab = table) == null || tab.length == 0) {// 如果初始化容量为负数,代表有其他线程正在初始化,注意它既是判断也是赋值操作if ((sc = sizeCtl) < 0)// 释放CPU时间片Thread.yield();// 如果没有其他线程初始化,如果内存中现在sizeCtl等于sc,CAS把sizeCtl的值改为-1else if (UpareAndSwapInt(this, SIZECTL, sc, -1)) {try { // CAS是原子操作,假如成功,其他线程进来就会yield,当前线程负责初始化if ((tab = table) == null || tab.length == 0) {//sc = sizeCtl = 32 ,所以n = 32int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];// 把新建的数组,赋值给table table = tab = nt; // sc = 32 - 32/4 = 24 扩容临界值>=24扩容sc = n - (n >>> 2);}} finally {// 前面提到,既是初始容量也是扩容临时值sizeCtl = sc;}break;}}// 返回数组return tab;
}

3.3 当前key对应的下标没有数据

        3.2执行完之后,数组初始化完成,当前线程又一次进入循环,会进行第2个判断条件,

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)

/*** 当前类:ConcurrentHashMap* tab为当前数组集合,在for (Node<K,V>[] tab = table;;),这里赋值给它的* if (key == null || value == null) throw new NullPointerException();* i = (n - 1) & hash) 就是对put的key的hash进行计算,得到他在hash表的下标位置,没有比较深究*/
// 把数组tab下标i处的value拿到赋值给f局部变量,如果f为null了,证明i下标没有值
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 如果i下标出为null,CAS赋值为新的k-v键值对的Node对象if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))//返回true,证明没有线程竞争这个下标,或者当前线程抢先赋值成功,就退出自旋,进入addCount对集合size进行设置或者扩容break;
}

3.4 当前key对应的下标有数据

         如果3.3计算出当前key对应的i下标处有元素。进行第3个判断条件,

else if ((fh = f.hash) == MOVED)

/*** 当前类:ConcurrentHashMap* static final int MOVED = -1;*/
// 如果key对应的i下标有元素,而且hash值为-1
// 证明有线程在扩容,当前节点已经被迁移走了,i节点的值已经变为:
// ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 后面会解析为什么
else if ((fh = f.hash) == MOVED)// 协助扩容,在解析扩容源码时讲解tab = helpTransfer(tab, f);

        如果没有线程正在扩容, 进行第4个判断条件,else {}

/*** 当前类:ConcurrentHashMap* static final int TREEIFY_THRESHOLD = 8;* static final int TREEBIN = -2;* 这次的文章不对红黑树进行解析,我也不太熟悉,只熟悉它的插入逻辑,删除直接看不懂,哈哈*/
else {V oldVal = null;
// f就是此时i下标处的节点,这次put的key和它所在下标一样,所以会形成链表synchronized (f) {
// 判断现在i处的节点,还是不是之前的节点。
// 高并发瞬息万变,可能的情况就是:当前线程还在这抢锁那,当前数组元素个数超过扩容临界值了,
// 其他线程都开始扩容了。
// 当前线程又往下走,if (binCount != 0) {},
// 肯定不满足,进入下次循环,又重头来了if (tabAt(tab, i) == f) {
// 代表是链表,红黑树hash是-2,TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null);}if (fh >= 0) { // 用于计算链表的长度binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 如果hash相等并且地址值一样或者对象相等,就替换原来的valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;// 替换成功,结束该链表的循环,进入if (binCount != 0) {},// 退出整个map集合数组的循环,执行addCount方法break;}// 不相等,那就要追加到链表的尾节点了Node<K,V> pred = e;// 找到链表尾节点,把该次put的k-v包装为节点,追加到链表上面。if ((e = e.next) == null) { = new Node<K,V>(hash, key,value, null);// 追加结束该链表的循环,进入if (binCount != 0) {},//退出整个map集合数组的循环,执行addCount方法。break;}}}// 啥也不说了,就是不会,所以不解析,哈哈哈哈哈哈哈哈哈else if (f instanceof TreeBin) {  // 代表是红黑树Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 只要把数据加入map集合成功,这个肯定不为0if (binCount != 0) {// 链表长度>=8,转为红黑树,链表长度值不算刚才put的key,算的话就是大于8才转if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);// 这个有值,代表替换value值,直接returnif (oldVal != null)return oldVal;// 不是替换,那就退出map的数组的循环,执行addCount方法。break;}
}

四、addCount方法源码解析->难度:超难

        addCount的方法步长,但是有点难度,分为2步讲:

        4.1 size计算规则和类变量解释

        4.2 类变量+1

        4.3 数组扩容

/*** 当前类:ConcurrentHashMap* private transient volatile CounterCell[] counterCells;* private transient volatile long baseCount;*/ 
private final void addCount(long x, int check) {CounterCell[] as; long b, s;// 第一个if是用来设置map集合的size大小的,既数组元素个数,首先解析这个方法if ((as = counterCells) != null ||!UpareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {CounterCell a; long v; int m;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||(a = Probe() & m]) == null ||!(uncontended =UpareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}if (check <= 1)return;s = sumCount();}if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {int rs = resizeStamp(n);if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (UpareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}else if (UpareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount();}}
}

4.1 size计算规则和类变量解释


// CounterCell类,就一个volatile修饰的value属性,多线程可见
@sun.misc.Contended static final class CounterCell {volatile long value;CounterCell(long x) { value = x; }
}// counterCells就是CounterCell类数组的类变量,初始化时长度为2
private transient volatile CounterCell[] counterCells;// baseCount就是个long类型的类变量
private transient volatile long baseCount;

         为什么要介绍counterCellsbaseCount类变量,因为它们两个是用来计算map集合的size值的,具体的计算方法就是baseCount+counterCells(所有CounterCell类的value之和)。所以在addCount方法中,只要是对baseCount或者counterCells中的某一个CounterCell 对象value属性进行了+1操作,就代表完成了对size+1操作,map集合元素的对象数量就设置成功了。

        如果高并发严重,还会对counterCells进行2倍扩容处理,来提高性能,设计思想值得我们学习。

        在ConcurrentHashMap的设计中,为了保证安全并且提供高性能,在对map集合的元素个数统计方面也做了优化设计。将putaddCount没有放在一个原子操作里面,所以在高并发的情况下,得到size的大小可能不是准确的,但是size的数量是不是实时和map集合元素的个数一致,相对于map集合的性能和安全来说,重要程度低一点。

        计算size的操作请看下面的代码:

// 当前类:ConcurrentHashMap
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;
}

4.2 类变量+1

        在看了4.1之后,相信对你看addCount方法有很大的帮助,这也是为什么4.2标题叫类变量+1的原因,类变量指baseCount或者counterCells。

首先分析第一个if条件:

/*** 当前类:ConcurrentHashMap* private transient volatile CounterCell[] counterCells;* private transient volatile long baseCount;* 产生一个线程安全的随机值,用来随机得到counterCells的下标,线程不变getProbe返回值不会变化* int h = Probe() * 同一个线程getProbe值变得话,调用advanceProbe方法,以后在调用getProbe就会变了* h = ThreadLocalRandom.advanceProbe(h);* 详细介绍请看大佬文章:/*/// 将类变量counterCells赋值给局部变量f,提高性能,f在线程工作内存,counterCells在主内存
// 如果as不为空,进入方法,证明counterCells初始化完成,首次初始化数组大小为2,既两个CounterCell对象
// 如果as为空,证明没有初始化,拿到baseCount的值,进行CAS操作,如果baseCount没变,则+1
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||!UpareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {// 如果as不为空或者as为空并且CAS替换失败,就会到这里,证明并发很高CounterCell a; long v; int m;// 无竞争的boolean uncontended = true;// as没有初始化,进入fullAddCount方法// 或者已经初始化了,但随机得到counterCells的下标a处元素为null,进入fullAddCount方法// 假设Probe()为100// 初始化而且a下标有元素,CAS对元素的value+1,失败进入fullAddCount方法// uncontended为true代表:counterCells没有初始化,或者随机下标处为null// uncontended为false代表:随机下标处有元素CounterCell,但是CAS失败,并发高有竞争if (as == null || (m = as.length - 1) < 0 ||(a = Probe() & m]) == null ||!(uncontended =UpareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {fullAddCount(x, uncontended);return;}// 这种情况就是CAS对counterCells元素的某一个下标元素value+1成功了。如果check<=1,// 可能的情况就是这次put才做是覆盖原值,这种在putVal方法就return了// 还有可能,这次put之前,那个hash冲突的下标处就一个节点,加现在put的,一共两个 // 这种情况,链表元素不多,没必要扩容,超过扩容临界的概率不大,就直接return了if (check <= 1)return;s = sumCount(); // 计算size大小,已经看过这方法了
}

 进入fullAddCount方法:

这个方法超级超级难理解,高并发的场景太复杂了

/*** 当前类:ConcurrentHashMap* private transient volatile int cellsBusy;* cellsBusy类变量相等于一个锁,想要加锁的就cas把它从0改为1,其他线程从0改1就会失败* 失败的代表没有抢到锁,改成功就相当于抢到了* wasUncontended为true代表:counterCells没有初始化,或者随机下标处为null* wasUncontended为false代表:随机下标处有元素,即CounterCell,但是CAS失败,并发高有竞争* uncontended:非竞争的* wasUncontended:是不是非竞争*/
private final void fullAddCount(long x, boolean wasUncontended) {// 此时h还时100int h;if ((h = Probe()) == 0) {ThreadLocalRandom.localInit();      // force initializationh = Probe();wasUncontended = true;}boolean collide = false;                // True if last slot nonemptyfor (;;) {CounterCell[] as; CounterCell a; int n; long v;// 这个进入这个if方法,代表counterCells初始化完成,CAS不成功可能会引发扩容// 赋值加判断,不为空并且数组中有元素,进入if条件if ((as = counterCells) != null && (n = as.length) > 0) {// 计算的(n - 1) & h下标是否有元素,没有就新增一个if ((a = as[(n - 1) & h]) == null) {// 0代表没线程抢锁,最开始已经注释了// 不为0,代表其他线程抢去了,执行collide = false;// 然后执行 h = ThreadLocalRandom.advanceProbe(h);// 这时又得到一个随机值h,假设h变为200了,进入下次循环if (cellsBusy == 0) { CounterCell r = new CounterCell(x); // 既然没有就new一个,x为1// 再次判断是否为0,高并发,不为0可能性很大,为0就CAS改为1,代表抢到锁if (cellsBusy == 0 &&UpareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try {// 抢到,先把当前内存的counterCells赋值给局部变量rs,因为高并发// 可以别的线程扩容了,如果此时rs不为null且数组中有元素,// 随机得到一个下标,下标处为null,就设置new的r,// 此时类变量+1这个目标就达到了CounterCell[] rs; int m, j;if ((rs = counterCells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0; // 释放锁}if (created) break; // 退出fullAddCount方法,判断要不要扩容continue;  // Slot is now non-empty 当前线程又没+1成功,继续循环}}collide = false;}// 假如(a = as[(n - 1) & h]) != null,则进入else if条件// wasUncontended为false,有线程竞争,把wasUncontended改为true,// 执行,h = ThreadLocalRandom.advanceProbe(h);重置随机值// 再次进入循环// 证明这个下标高并发很严重,需要重置去其他下标CASelse if (!wasUncontended)       // CAS already known to failwasUncontended = true;      // Continue after rehash// 如果随机下标有元素就去CAS,成功就结束fullAddCount方法,失败就继续判断else if (UpareAndSwapLong(a, CELLVALUE, v = a.value, v + x))break;// 证明被扩容了,地址值不一样了else if (counterCells != as || n >= NCPU)collide = false;            // At max size or stale// 设置为true,证明之前的null槽了,不为空了else if (!collide)collide = true;// 抢锁扩容else if (cellsBusy == 0 &&UpareAndSwapInt(this, CELLSBUSY, 0, 1)) {try {//内存的counterCells还和as一样的话,才扩容,要不然代表已经扩了if (counterCells == as) {// Expand table unless stale//二倍扩容并迁移数据CounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];// 替换counterCells的值counterCells = rs;}} finally {cellsBusy = 0; // 释放锁}collide = false;continue; // 继续循环,类变量+1动作还没有完成}// 重置随机数h = ThreadLocalRandom.advanceProbe(h);}// 这个else if方法,代表counterCells没有初始化,进行加锁并初始化else if (cellsBusy == 0 && counterCells == as &&UpareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try {                           // Initialize tableif (counterCells == as) {// 默认初始化数组长度2CounterCell[] rs = new CounterCell[2];rs[h & 1] = new CounterCell(x); // 完成类变量+1counterCells = rs; // 给counterCells的赋值init = true;}} finally {cellsBusy = 0;}if (init)break; // 结束了,退出fullAddCount方法,判断要不要扩容}// 进入这个else if方法,代表给counterCells的任意下标CounterCell对象,// value+1没有成功,尝试给baseCount+1,也失败,只能继续循环了else if (UpareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))break;                          // Fall back on using base}
}

4.3 数组扩容

        进入addCount方法的第2个判断条件。

/*** 当前类:ConcurrentHashMap* 简单介绍,标题5详解* s为上面第1个if条件计算的size值* private static final int MAXIMUM_CAPACITY = 1 << 30;* private transient volatile int sizeCtl; 初始化容量,或者扩容临界值* private static int RESIZE_STAMP_BITS = 16;* private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;* private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;*/
if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// size值大于等于扩容临界值,就扩容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// 生成一个协助扩容的辅助值int rs = resizeStamp(n);// 协助扩容线程if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (UpareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 主扩容线程else if (UpareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount(); // 计算size大小}
}

五、transfer方法源码解析->难度:超难

        扩容的设计思想也有很多值得学习的地方,多线程扩容,判断节点要不要迁移的链表分段思想等等。先解释并画图,再解析源码。

if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// size值大于等于扩容临界值,就扩容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// 生成一个协助扩容的辅助值,00000000 00000000 10000000 00011010int rs = resizeStamp(n);// 协助扩容线程if (sc < 0) {if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break;if (UpareAndSwapInt(this, SIZECTL, sc, sc + 1))transfer(tab, nt);}// 主扩容线程else if (UpareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);s = sumCount(); // 计算size大小}
}

5.1 基本知识解释

        先解释resizeStamp方法,得到 rs的值00000000 00000000 10000000 00011010

/*** private static int RESIZE_STAMP_BITS = 16;* numberOfLeadingZeros得到一个int值的二进制从左到右,连续0的个数* n为数组长度32,二进制 00000000 00000000 00000000 00100000,左边连续26个0* Integer.numberOfLeadingZeros(32) | (1 << 15) = 32794*/static final int resizeStamp(int n) {// 26 | (1 << 15) // 00000000 00000000 00000000 00011010 | 00000000 00000000 10000000 00000000// 00000000 00000000 10000000 00011010return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}// 利用位运算结合二分法,得到i的从左到右连续0的个数
public static int numberOfLeadingZeros(int i) {// HD, Figure 5-6if (i == 0)return 32;int n = 1;if (i >>> 16 == 0) { n += 16; i <<= 16; }if (i >>> 24 == 0) { n +=  8; i <<=  8; }if (i >>> 28 == 0) { n +=  4; i <<=  4; }if (i >>> 30 == 0) { n +=  2; i <<=  2; }n -= i >>> 31;return n;
}

        接下来将解释怎么判断,当前的节点要不要迁移。

        如有两个keyhash为:1,33。在32长度数组上,它们与32进去取模,都在下标1的位置,但是扩容为64长度之后哪?1对应的key肯定不用迁移,但是33对应的key就要迁移到1+32=33的位置。这是十进制我们可以很快算出。

        但是用二进制怎么判断它要不要迁移,请看下图:

图1

        得出的结构就是,如果对原来的数组进行扩容,判段该节点要不要迁移,只要把该节点的keyhash与数组长度n,进行&运算,为0代表不需要扩容,不为0代表需要扩容。 


        接下来解释多线程扩容,如果数组长度是32,则两个线程扩容,线程1负责:16-31,线程2负责:0-15,为什么是这样,源码解释

/*** NCPU:CPU核心数* private static final int MIN_TRANSFER_STRIDE = 16;* 如果只有一个cpu,就一个线程扩容,stride为数组长度n* stride值最小为16,如果n等于16,就一个线程扩容0-15。n等于32就两个线程0-15,16-31* 现在说这些可能不太理解,看源码就知道*/
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE){stride = MIN_TRANSFER_STRIDE;
}

图2


        还的解释一下,如果是链表的话,有一个非常有意思的设计,可以提高性能。会把老链表的元素分为,需要扩容的hn新链表,不需要扩容的ln新链表。Node<K,V> ln, hn;

        fh节点hash,n为扩容去数组长度

        链表开始循环,第一个节点以后的int runBit = fh & n不变,就继续循环,如果循环到一个节点pp.hash & n,与之前runBit 不一样了,就把该节点赋值给lastRun保存起来,runBit 也变为当前节点的p.hash & n的值,继续循环直到遍历完所有的节点。

        最后判断runBit=0的话,lastRun赋值给ln,否则把lastRun赋值给hn。优秀的设计,就是为了提高新能。Node<K,V> lastRun = f;

        请看下图,假如很多keyhash值为133

         从下图可以总结出,D节点之后的都是需要迁移的,所以把D赋值给hn,循环迁移链表的时候,到D节点就代表迁移完了。

        之后会形成lnhn新链表,然后把它们迁移到新的64长度数组

图3

5.2 扩容源码

        上面的解释看多了之后,肯定还是很懵的,但是在结合源码,你就会理解了。        

        首先详细解释一下,第二个if条件。

/*** private static int RESIZE_STAMP_BITS = 16;* private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;* private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;*/
if (check >= 0) {Node<K,V>[] tab, nt; int n, sc;// size值大于等于扩容临界值,就扩容while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) {// 生成一个协助扩容的辅助值,00000000 00000000 10000000 00011010// 这个已经再5.1标题解释过了,我们现在的map容量是32,这个rs就是上面那个int rs = resizeStamp(n);// 协助扩容线程,sc=24,当前线程1,肯定进入else ifif (sc < 0) {// 具体的位运算,判断要不要扩容,可以自己计算if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)break; // 不需要协助扩容了// 协助的线程2,把sizeCtl从10000000 00011010 00000000 00000010// 变为 10000000 00011010 00000000 00000011if (UpareAndSwapInt(this, SIZECTL, sc, sc + 1))// tab原来的32长度数组,nt现在的64长度数组transfer(tab, nt);}// 主扩容线程,扩容线为:线程1,把sizeCtl从24改为,rs 左移16位再加2// 变为 10000000 00011010 00000000 00000010为负数// 这个主线程加2,协助线程加1,没有必要纠结为什么else if (UpareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))// tab原来的32数组transfer(tab, null);s = sumCount(); // 计算size大小}
}

         进入transfer方法,首先查看if (nextTab == null) {},初始化方法

/*** 当前类:ConcurrentHashMap*/
// 为空就初始化一个新的数组
if (nextTab == null) {            // initiatingtry {@SuppressWarnings("unchecked")// 原来的二倍,32*2=64Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}// 赋值给nextTable,线程2进来时,就可以得到这个new的数组 nextTable = nextTab;// 很有用,原数组长度transferIndex = n;
}// 所以已经迁移过的街道,会被设置为
// ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// hash为MOVED=-2
ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null);Table = tab;
}

        接下来看for循环,里面的抽象地方要自己好好想想,不好形容。 

/*** 当前类:ConcurrentHashMap* NCPU CPU核心数* MIN_TRANSFER_STRIDE 16 */private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 这个感兴趣就研究吧,很简单,现在原数组为32,stride=16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide rangeif (nextTab == null) {            // initiating// 解析过了 代码删了,方法太长了}// 64新数组长度int nextn = nextTab.length;// 迁移过的用这个,替换ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);boolean advance = true;boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 这个循环就是切分原数组,让两个线程同时扩容的(对于32长度是两个)// 假如主扩容线程1进来,线程1while方法结束后,协助扩容线程2再进来while (advance) {int nextIndex, nextBound;// 线程1不满足,线程2不满足if (--i >= bound || finishing)advance = false;// 线程1不满足,线程2不满足else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}// 线程1满足,线程1把类变量nextIndex设置为16,原来是32
//  (线程1while方法执行完,线程2执行到这)线程2满足,线程2把类变量nextIndex设置为0,原来16else if (UpareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {// 线程1的bound=16// 线程2的bound=0bound = nextBound;// 线程1的i=31// 线程2的i=15i = nextIndex - 1;advance = false;}}// 假如线程1执行完while方法,线程2进来执行完while方法,就会形成图2// 线程12就会,从后往前,迁移数据// 线程1负责31-16,线程2负责15-0,迁移数据,其他线程进来,发现nextIndex为0// 就不会再切分原数组了,检查有没有漏掉的就行了if (i < 0 || i >= n || i + n >= nextn) { // 这个方法不好描述,只能自己想象了// 这个方法就是判断是否扩容完的int sc;if (finishing) { // 在这把table替换为新的// 这个就代表替换的新数组,nextTable = null,// 其他线程就不会进入transfer方法了,看addCount方法// if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 //||sc == rs + MAX_RESIZERS || (nt = nextTable) == null //||transferIndex <= 0) break;nextTable = null;table = nextTab;// 设置扩容临界值32*2-32/2=4sizeCtl = (n << 1) - (n >>> 1);return;}// 之前rs加的值,在这里先-1再-2,判断是否等于// resizeStamp(n)<<RESIZE_STAMP_SHIFT),等于就代表所有的协助线程都扩容完了,直接退出// 不等于的话finishing=true,i = n; 再次检查所有节点是否全部迁移if (UpareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}// 为空的不迁移,设置节点为fwd,代表迁移过了else if ((f = tabAt(tab, i)) == null)advance = casTabAt(tab, i, null, fwd);// 为MOVED代表已经迁移过了else if ((fh = f.hash) == MOVED)advance = true; // already processedelse {// 锁住进行迁移synchronized (f) {if (tabAt(tab, i) == f) { // 看看当前i处,还是不是f,扩容时可以removeNode<K,V> ln, hn; // ln不迁移,hn迁移,看图3if (fh >= 0) { // 链表int runBit = fh & n; // 看5.1基本解释Node<K,V> lastRun = f; // 看5.1基本解释// 看5.1基本解释for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}// 上述for结束,就会标记处一个节点,// 那个节点以后的节点和此节点一样,都是迁移或者不迁移的if (runBit == 0) { // 迁移的ln = lastRun; // 把lastRun赋值给lnhn = null;}else { // 不迁移的hn = lastRun; // 把lastRun赋值给hnln = null;}// 看图3,循环遍历,组图3的ln和hn链表for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}// 把ln放在新数组的原下标setTabAt(nextTab, i, ln);// 把hn放在新数组的原下标+32setTabAt(nextTab, i + n, hn);// 处理的节点设置为fwd,hash为-1setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {...  // 不会不讲解,哈哈哈哈}}}}}
}

总结

        看源码的时候,一定要结合高并发场景,这种场景不好形容,也不好画图,只能自己想象。希望大家能在经典的ConcurrentHashMap源码中学到有用的知识,学习它里面优秀的设计思想,应用到自己的开发当中,变得更加的优秀。可以多看几遍,就会对它的认识更加深刻,很多不明白的地方也会想通。

本文发布于:2024-01-31 00:37:14,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170663282224000.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:详解   源码   版本   方法   详细
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23