您当前的位置: 首页 > 

小志的博客

暂无认证

  • 1浏览

    0关注

    1217博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

JDK1.8中ConcurrentHashMap源码解析

小志的博客 发布时间:2021-11-16 21:45:00 ,浏览量:1

目录
    • 一、ConcurrentHashMap使用场景
    • 二、put方法的整体流程
    • 三、第一种情况(初始化table数组)
      • 3.1、initTable源码如下:
      • 3.2、sizeCtl
      • 3.3、流程解释
    • 四、第二种情况(寻址后的位置没有被占用)
      • 4.1、源码如下
      • 4.2、ABASE和ASHIFT 变量
      • 4.3、tabAt 方法
      • 4.4、casTabAt 方法
      • 4.5、流程解释
    • 五、第三种情况(如果寻址后的位置是正在迁移状态)
      • 5.1、源码
      • 5.2、helpTransfer方法
    • 六、其他情况( 将节点插入到链表中或者红黑树中。)
      • 6.1、源码如下
      • 6.2、流程解释
      • 6.3、第一部分:向链表中插入Node的操作
      • 6.4、第三部分:扩容或者转换元素存储类型的操作
        • 6.4.1 、treeifyBin方法
        • 6.4.2、 tryPresize方法
          • 6.4.2.1、 第一个判断(table数组没有初始化完毕)
          • 6.4.2.2、第二个判断(数组超过最大值,或者扩容发生越界)
          • 6.4.2.3、第三个判断(table还是那个table,这个过程中没有被其他线程重建过
        • 6.4.3、transfer方法
          • 6.4.3.1 步骤1:计算迁移时的步长
          • 6.4.3.2 步骤2:初始化nextTab
          • 6.4.3.3 步骤3:扩容及数据迁移
            • 6.4.3.3.1 case1:扩容迁移结束逻辑
            • 6.4.3.3.2 case2:如果下标i处没有节点,则不需要进行扩容迁移操作
            • 6.4.3.3.3 case3:下标为i的这个位置已经被处理过了
            • 6.4.3.3.4 case4:执行扩容迁移操作
        • 6.4.4、tableSizeFor方法
        • 6.4.5、numberOfLeadingZeros方法
    • 七、addCount将ConcurrentHashMap中存储的k,v总数+1
      • 7.1、源码
      • 7.2 第一种情况(计算当前存入key-value的总数)
        • 7.2.1、fullAddCount方法
          • 7.2.1.1、case1:counterCells不为空且数组里面有元素
          • 7.2.1.2、case2:cellsBusy为0且counterCells为空
          • 7.2.1.3、case3:尝试修改baseCount的值
        • 7.2.2、sumCount方法
      • 7.3 第二种情况(存储的总kv数量达到了阈值,执行扩容)

一、ConcurrentHashMap使用场景
  • 我们平时最常用的HashMap其实不是线程安全的,而当我们有多线程使用场景的时候,即想线程安全,又想拥有Map的能力,我们可以选择HashTable,因为它是针对我们常用的方法上面加上了synchronize锁,但是在高并发的场景下,效率低是它的弊端。如果我们还非常在意效率,那么我们更好的选择是使用ConcurrentHashMap。

  • 示例:

    启动100个线程,每个线程循环100次,像容器中应该放入10000个元素。我们看到运行结果可以发现,HashMap并不是10000,这就说明,它在多线程并发的情况下,出现了线程不安全的问题。而ConcurrentHashMap返回的结果是没有问题的。

    在这里插入图片描述

二、put方法的整体流程

1、put方法源码

 public V put(K key, V value) {
        return putVal(key, value, false);
}

2、在put方法中,调用了putVal方法。由于该方法代码较多,我们只保留框架性质的代码,这样会更方便我们理解。如下所示:

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        ... ...
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
 				... 针对f链表或红黑树进行添加Node节点操作,执行完毕后break ...  
 				}  
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

3、上面的代码,可以分为两部分内容,

(1)、第一部分:首先开启了无限循环,在里面进行了4中情况的判断。

  • 第一种情况:如果table数组需要被创建。 如果table数组为null或者长度为0,则创建table数组。

  • 第二种情况:如果寻址后的位置没有被占用。 创建Node节点,插入到这个位置

  • 第三种情况:如果寻址后的位置是正在迁移状态。 加入到迁移大军中,帮助一起进行扩容迁移操作。

  • 第四种情况:其他情况 将节点插入到链表中或者红黑树中。

(2)、 第二部分:执行addCount,将ConcurrentHashMap中存储的k,v总数+1。

  • 下面将骤一的对上述的四个情况和addCount步骤进行分析。
三、第一种情况(初始化table数组) 3.1、initTable源码如下:
 private final Node[] initTable() {
        Node[] tab; int sc;
        //table=null
        while ((tab = table) == null || tab.length == 0) {
            //sizeCtl =0
            //如果sizeCtl为-1,则表示table数组正在被别的线程初始化
            if ((sc = sizeCtl)  0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        // n =16
                        Node[] nt = (Node[])new Node[n];
                        table = tab = nt;
                        //n>>>2=10000>>>2=00100=4 (16/2/2=4)
                        /**sc= (4-1)/4n =0.75*n*/
                        sc = n - (n >>> 2); //sc =16-4 =12
                    }
                } finally {
                    //sc=12, sizeCtl =12
                    sizeCtl = sc;
                }
                break;
            }
        }
        //返回size为16的空的Node数组
        return tab;
    }
3.2、sizeCtl
  • 如果sizeCtl为-1,则表示table数组正在被别的线程初始化。默认sizeCtl=0,当table数组初始化或者扩容完毕的时候,sizeCtl会表示扩容阈值。 在这里插入图片描述

  • 默认table数组的长度为16 在这里插入图片描述

3.3、流程解释
  • 我们通过源码可以看到,如果table没有被初始化完毕的话,那么会一直在while循环中,直到table数组初始化完毕:

    while ((tab = table) == null || tab.length == 0)
    
  • 假设现在有4条线程同时的要去创建table数组,那么当有一条线程已经优先开始初始化table数组操作的时候,sizeCtl就会被赋值为-1,那么其他线程就会执行Thread.yield()让出cpu,并继续while循环,然后再执行Thread.yield(),在那spin旋转,直到那个最早的线程创建好创建table数组之后,所有线程都会跳出while继续往下执行。

    private final Node[] initTable() {
            Node[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl)  var1);
                * 返回数组类型的[比例因子](其实就是【数组中元素偏移地址的增量】,
                * 因为数组中的元素的地址是连续的)。
                * 此方法不适用于数组类型为“narrow”类型的数组,“narrow”类型的数组类型
                * 使用此方法会返回0(这里narrow应该是狭义的意思,但是具体指哪些类型
                * 暂时不明确)。
                * Unsafe类中已经初始化了很多类似的常量如ARRAY_BOOLEAN_INDEX_SCALE等。
                **/
                int scale = U.arrayIndexScale(ak);//4
                if ((scale & (scale - 1)) != 0)// 4 & (4-1) =0100 & 0011 =0000
                    throw new Error("data type scale not a power of two");
                 /**
                 *Integer.numberOfLeadingZeros(int i) 
                 *给定一个int类型数据,返回这个数据的二进制串中从最左边算起连续的“0”的
                 *总数量。
                 *而ASHIFT其实就是将scale数值转换为[按位左移对应的数值],
                 *即:通过scale =4,name计算出ASHIFT=2,
                 *而N 2); /** 3/4*n */
                }
            } finally {
                /** 将sizeCtl设置为3/4的table数组长度 */
                sizeCtl = sc;
            }
        }
    }
    
  • 执行过程如下所示 (1)、首先,将sizeCtl赋值为-1,则表示针对table正在操作中。 (2)、其次,创建table,且sc赋值为table数组的3/4长度。 (3)、最后,将sizeCtl赋值为扩容阈值,表示针对table的操作已经执行完毕。

6.4.2.2、第二个判断(数组超过最大值,或者扩容发生越界)
  • 源码如下

    在这里插入图片描述

6.4.2.3、第三个判断(table还是那个table,这个过程中没有被其他线程重建过

1、源码如下 在这里插入图片描述 2、resizeStamp方法的具体作用是返回table数组长度相关信息。源码如下所见: 在这里插入图片描述

3、resizeStamp方法的解释

  • 其中,Integer.numberOfLeadingZeros(n)具体逻辑,请参考【6.4.5> numberOfLeadingZeros(int i)】
  • 我么可以举例上面方面发计算过程: 假设n=16,那么二进制就是00010000,那么从左侧最高位开始计算,连续一共有27个0,那么Integer.numberOfLeadingZeros(16)就返回27。 RESIZE_STAMP_BITS=16,那么1> 16 == 0) { // eg1: 0100>>>16=0000 n += 16; // eg1: n=1+16=17 i 31; // eg1: n=29-(0100 0000 0000 0000 0000 0000 0000 0000 >>> 31)=29-0=29 return n; }
  • 方法源码看起来不太直观,我们通过图来了解一下 在这里插入图片描述在这里插入图片描述

七、addCount将ConcurrentHashMap中存储的k,v总数+1 7.1、源码
  • 源码如下

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
         if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
             ... 第一种情况,计算当前存入key-value的总数 ...
         }
         if (check >= 0) {
           ... 第二种情况,存储的总kv数量达到了阈值,执行扩容 ...
         }
     }
    
  • 源码解释 (1)、第一种情况:计算当前存入key-value的总数 (2)、第二种情况:存储的总kv数量达到了阈值,执行扩容

  • addCount方法作用 维护ConcurrentHashMap中总的kv数量值,当存储的总kv值超过了阈值,那么会执行扩容操作。

7.2 第一种情况(计算当前存入key-value的总数)
  • 源码如下

    if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 
        CounterCell a;
        long v;
        int m;
        boolean uncontended = true;
        if (as == null || 
                (m = as.length - 1) = NCPU) {
                collide = false; // At max size or stale
            }
            else if (!collide) {
                collide = true;
            }
            else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                ... 多线程之间设置CounterCell的value值时发生碰撞,那么扩展CounterCell的长度,以减少碰撞次数 ...
            }
            ... ...
        }
        /**
         * case2: cellsBusy为0且counterCells为空
         */
        else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            ... 创建一个长度为2的CounterCell数组,将x赋值进数组,跳出循环 ...
        }
        /**
         * case3: 尝试修改baseCount的值
         */
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) {
            ... 如果修改成功,则跳出循环 ...
            break; 
        }
    }
    
  • 源码解释 (1)、fullAddCount代码较多,总体可以分为两部分:第一部分,如果获得随机数为0,则初始化当前线程的探针哈希值。第二部分,开启无限循环,利用CounterCell进行计数。 (2)、 第一部分比较简单。只需要记住一下几个方法的作用: ThreadLocalRandom.getProbe():用来获得随机数 ThreadLocalRandom.localInit():初始化当前线程的探针哈希值 ThreadLocalRandom.advanceProbe(h):更改当前线程的探针哈希值 (3)、第二部分,利用CounterCell进行计数一共分为3种情况: case1:counterCells不为空且数组里面有元素。 case2:cellsBusy为0且counterCells为空。 case3:尝试修改baseCount的值。 (4)、这里我通过一张图,先介绍一下代码里的逻辑: 在这里插入图片描述 在这里插入图片描述

7.2.1.1、case1:counterCells不为空且数组里面有元素
  • 部分源码

    CounterCell[] as;
    CounterCell a;
    int n;
    long v;
    if ((as = counterCells) != null && (n = as.length) > 0) {
        if ((a = as[(n - 1) & h]) == null) { /** 如果随机数h待插入的下标位置没有元素 */
            if (cellsBusy == 0) {            // Try to attach new Cell
                CounterCell r = new CounterCell(x); // Optimistic create
                /** 通过CAS将cellsBusy设置为1,来表示正在操作CounterCell;操作完毕后,将cellsBusy设置为0,其他线程可以继续操作 */
                if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    boolean created = false;
                    try {    // Recheck under lock
                        CounterCell[] rs;
                        int m;
                        int j;
                        if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                            rs[j] = r; /** 在下标j处,插入新建的CounterCell r */
                            created = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (created) {
                        break;
                    }
                    continue;           // Slot is now non-empty
                }
            }
            collide = false; /** collide:碰撞,cellsBusy不等于0时,表示有其他线程正在操作CounterCell,collide设置为false */
        }
        else if (!wasUncontended) { /** 如果wasUncontended=false(表示当前线程CAS竞争失败),则将wasUncontended重置为true */
            wasUncontended = true;      // CAS already known to fail,Continue after rehash
        }
        else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) { /** 尝试将a的value值加x */
            break;
        }
        else if (counterCells != as || n >= NCPU) {
            collide = false;            // At max size or stale
        }
        else if (!collide) {
            collide = true;
        }
        /** 多线程之间设置CounterCell的value值时发生碰撞,那么扩展CounterCell的长度,以减少碰撞次数 */
        else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            try {
                if (counterCells == as) {// Expand table unless stale
                    CounterCell[] rs = new CounterCell[n             
关注
打赏
1661269038
查看更多评论
0.2496s