您当前的位置: 首页 >  容器

顧棟

暂无认证

  • 1浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】并发容器之ConcurrentHashMap(JDK1.8版)

顧棟 发布时间:2022-06-01 20:00:00 ,浏览量:1

ConcurrentHashMap(JDK1.8)

文章目录
  • ConcurrentHashMap(JDK1.8)
    • 组成
      • 内部类
        • Node
        • ForwardingNode
        • TreeNode
        • TreeBin
      • 数据结构
      • 核心方法
        • `int spread(int h)`计算hash值
      • 构造函数
      • PUT过程
        • putVal(K key, V value, boolean onlyIfAbsent)
        • initTable()
        • helpTransfer(tab, f)
        • transfer(tab, nextTab)
        • treeifyBin(tab, index)
        • tryPresize(int size)
        • addCount(long x, int check)
        • fullAddCount(long x, boolean wasUncontended)
      • GET过程
        • get(Object key)
      • Replace和Remove过程
        • replaceNode(Object key, V value, Object cv)
      • 其他
        • sizeCtl
在JDK1.5~1.7版本,Java使用了分段锁机制实现ConcurrentHashMap。在使用中发现最大并发度受Segment的个数限制。因此,在JDK1.8中,ConcurrentHashMap的实现原理摒弃了这种设计,而是选择了与HashMap类似的数组+链表+红黑树的方式实现,而加锁则采用CAS和synchronized实现。

组成 内部类 Node

拥有键值对的链表的基础存储结构

    static class Node implements Map.Entry {
        // key的hash值
        final int hash;
        // 关键字
        final K key;
        // 存储值
        volatile V val;
        // 下一个结点
        volatile Node next;

        Node(int hash, K key, V val, Node next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node find(int h, Object k) {
            Node e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }
ForwardingNode

在transfer操作期间插入到 bin 头部的节点。

static final class ForwardingNode extends Node {
    // node数组,存放的什么?
    final Node[] nextTable;
    ForwardingNode(Node[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }

    Node find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node[] tab = nextTable;;) {
            Node e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh  h)
                    p = pl;
                else if (ph > 16)) & HASH_BITS;
}
构造函数
  1. ConcurrentHashMap(int initialCapacity)

创建一个新的空映射,其初始表大小可容纳指定数量的元素,无需动态调整大小。

参数

  • initialCapacity:初始容量。
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity = (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}
  1. ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

根据给定的元素数 (initialCapacity)、表密度 (loadFactor) 和并发更新线程数 (concurrencyLevel) 创建一个新的空映射,其初始表大小。

参数

  • initialCapacity:初始容量。 给定指定的负载因子,实现执行内部大小调整以适应这么多元素。
  • loadFactor:用于建立初始表大小的负载因子(表密度)。
  • concurrencyLevel:估计的并发更新线程数。 实现可以使用这个值作为大小提示。
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity > 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n = MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
PUT过程 putVal(K key, V value, boolean onlyIfAbsent)
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

	final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 计算hash值
        int hash = spread(key.hashCode());
        // 对应链表长度???
        int binCount = 0;
        // 存放数据的Node数组
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            // 若Node[]还是null或没有元素,则需要进行初始化
            if (tab == null || (n = tab.length) == 0)
                // 初始化数组
                tab = initTable();
            // 至此代表容器中的数组已经成功初始化,找该 hash 值对应的数组下标,得到第一个节点 f;
            // 若f为null,使用CAS操作直接将新节点放到数组下标i的位置,执行成功直接退出for,执行失败代表存在竞争,继续执行
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // MOVED代表在扩容
            else if ((fh = f.hash) == MOVED)
                // 帮助数据迁移
                tab = helpTransfer(tab, f);
            else {
                // 此时数组当前下标的元素的头结点f,不为空
                V oldVal = null;
                // 对这个位置的头结点上锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 对此头结点的hash进行判断,若值>=0则说明是数组此下标的元素是个链表
                        if (fh >= 0) {
                            // 链表节点计数,由头结点从1开始
                            binCount = 1;
                            // 遍历链表
                            for (Node e = f;; ++binCount) {
                                K ek;
                                // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                // 到了链表的最末端,将这个新值放到链表的最后面
                                Node pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key, value, null);
                                    break;
                                }
                            }
                        }
                        // 此时头结点的hash小于0,检查数组此下标的元素是不是红黑树
                        else if (f instanceof TreeBin) {
                            Node p;
                            //为啥这个计数从2开始?
                            binCount = 2;
                            // 调用红黑树的插值方法插入新节点
                            if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 判断是否尝试要将链表转换为红黑树,TREEIFY_THRESHOLD=8
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 总元素个数+1
        addCount(1L, binCount);
        return null;
    }
initTable()

使用 sizeCtl 中记录的大小初始化表。

假设sizeCtl=16

sizeCtl:  16  -1   12
sc     :  0   16   12
n      :  -   16
    private final Node[] initTable() {
        Node[] tab; int sc;
        // 判断Node数组是否被初始化了
        while ((tab = table) == null || tab.length == 0) {
            // sizeCtl>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

最终数组的长度为16,此时sizeCtl为12。

helpTransfer(tab, f)
//扩容状态下其他线程对集合进行插入、修改、删除、合并、compute等操作时遇到 ForwardingNode 节点会调用该帮助扩容方法
final Node[] helpTransfer(Node[] tab, Node f) {
    Node[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab && (sc = sizeCtl) >> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex  1) ? (n >>> 3) / NCPU : n) = 0) {
            Node[] tab = table; int n;
            //如果数组初始化则进行初始化,这个选项主要是为批量插入操作方法 putAll 提供的
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node[] nt = (Node[])new Node[n];
                            table = nt;
                            sc = n - (n >>> 2);// 0.75 * n
                        }
                    } finally {
                        //初始化完成后 sizeCtl 用于记录当前集合的负载容量值,也就是触发集合扩容的阈值
                        sizeCtl = sc;
                    }
                }
            }
            else if (c = MAXIMUM_CAPACITY)
                break;
            //插入节点后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容会进入到下面这个 else if 分支
            else if (tab == table) {
                int rs = resizeStamp(n);
                // 说明此时有线程在执行扩容
                if (sc >> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex             
关注
打赏
1663402667
查看更多评论
0.0604s