- ConcurrentLinkedQueue
- 类图
- 组成
- 数据结构
- 内部类
- 类的属性
- 类的构造函数
- 核心方法
- 入队列 boolean offer(E e)
- 出队列 E poll()
- 移除操作remove(Object o)
- first()
- isEmpty()
- size()
- 状态说明
一个头结点和一个尾结点
每个结点Node包含item和下一个结点next引用,默认情况下,tail等于head,元素为null。
内部类private static class Node {
// 元素
volatile E item;
// 下一个结点引用
volatile Node next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 构造函数
Node(E item) {
// 设置item的值
UNSAFE.putObject(this, itemOffset, item);
}
// 设置item的值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node val) {
// 设置next域的值,并不会保证修改对其他线程立即可见
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// 比较并替换next域的值
boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// item域的偏移量
private static final long itemOffset;
// next域的偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
类的属性
// 链表首结点
private transient volatile Node head;
// 链表尾结点
private transient volatile Node tail;
类的构造函数
默认无参构造函数,创建一个空的ConcurrentLinkedQueue。
public ConcurrentLinkedQueue() {
// 初始化头结点与尾结点
head = tail = new Node(null);
}
将给的集合c转换成ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
public ConcurrentLinkedQueue(Collection c) {
Node h = null, t = null;
for (E e : c) { //遍历c集合
// 检查元素不为null
checkNotNull(e);
// 新建node结点
Node newNode = new Node(e);
// 首结点为空
if (h == null)
h = t = newNode;
else {
// 尾插法
// 直接头结点的next域
t.lazySetNext(newNode);
// 重新赋值头结点
t = newNode;
}
}
// 头结点为null
if (h == null)
// 新结头结点与尾结点
h = t = new Node(null);
head = h;
tail = t;
}
核心方法
方法名描述boolean add(E e)
在此队列的尾部插入指定元素。 由于队列是无界的,因此该方法永远不会抛出IllegalStateException
或返回false
。void updateHead(Node h, Node p)
尝试 CAS 前往 p。 如果成功,将旧头重新指向自身作为succ()
的哨兵,如下所示。Node succ(Node p)
如果p.next
已链接到self
,则返回p
的后继结点或头结点,这仅在使用现在不在列表中的过时指针遍历时才为真。boolean offer(E e)
在此队列的尾部插入指定元素。 由于队列是无界的,因此此方法永远不会返回false
。E poll()
此方法移除并返回此ConcurrentLinkedQueue的头部;如果此队列为空,则返回null。E peek()
此方法返回此ConcurrentLinkedQueue的头部,而不删除它Node first()
返回列表中的第一个活动(未删除)结点,如果没有,则返回 null。 这是 poll/peek 的另一种变体; 这里返回第一个结点,而不是元素。 我们可以让 peek() 成为 first() 的包装器,但这会花费额外的 volatile 读取项,并且需要添加一个重试循环来处理输给并发 poll() 的竞争的可能性。int size()
返回此队列中的元素数。 如果此队列包含多个 Integer.MAX_VALUE 元素,则返回 Integer.MAX_VALUE。请注意,与大多数集合不同,此方法不是恒定时间操作。 由于这些队列的异步特性,确定当前元素的数量需要 O(n) 遍历。 此外,如果在此方法执行期间添加或删除元素,则返回的结果可能不准确。 因此,这种方法在并发应用程序中通常不是很有用。boolean remove(Object o)
从此队列中移除指定元素的单个实例(如果存在)。 更正式地说,如果该队列包含一个或多个这样的元素,则删除一个元素 e 使得 o.equals(e)。 如果此队列包含指定的元素(或等效地,如果此队列因调用而更改),则返回 true。
入队列 boolean offer(E e)
public boolean offer(E e) {
// 元素不为null
checkNotNull(e);
// 新建一个结点
final Node newNode = new Node(e);
// 无限循环遍历链表
// 开启一个从tail指针开始遍历的循环,p指向当前疑似尾结点;
for (Node t = tail, p = t;;) {
// 初始化q=p的下一个结点;
Node q = p.next;
// 如果q==null,说明此刻p确实是尾结点,新结点应该插到后面;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) { // CAS将新结点链接到p结点上
//首次添加时,p 等于t,不进行尾结点更新,所以所尾结点存在滞后性
//并发环境,可能存添加/删除,tail就更难保证正确指向最后结点。
if (p != t) // 和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个结点;
// p不等t,CAS替换新结点为尾结点
casTail(t, newNode); // cas操作安全移动tail指针。
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) //p==q,也即p的next字段指向自身,说明p是一个脱离了链表的结点,需要找一个结点重新开始遍历。
// 原来的尾结点与现在的尾结点是否相等,若相等,则p为head,否则,赋值为现在的尾结点
// 当tail不执行最后结点时,如果执行出列操作,很有可能将tail也给移除了
// 此时需要对tail结点进行复位,复位到head结点
// 从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.按道理直接p=q跳到下一个结点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。
p = (p != t && t != (t = tail)) ? t : q;
}
}
入队列主要考虑两件事:
-
新增新结点,设置为当前未结点的next
-
是否更新tail结点,若tail的next不为空,则将入队列的结点变为新的tail结点;若tail的next为空,则不更新tail结点。
通过减少更新tail的次数,提高入队效率
public E poll() {
restartFromHead:
for (;;) {
// 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
for (Node h = head, p = h, q;;) {
E item = p.item;
// 如果p结点的数据不为空,那么p肯定就是真正的第一个结点,只要能将结点的数据字段(item)置为null,出队列就算成功);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// p == h,说明刚出队列的p就是遍历开始的第一个结点,有2种情况
// 1)没有并发poll,head指向了空结点,队列里面只有且仅有一个空结点,此时没有必要做head移动;
// 2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
if (p != h) // hop two nodes at a time
// 如果p.next==null,说明p是尾结点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了,此时将head置为尾结点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了(参考第一结总结的结点属性),这种情况下,只能重新开始poll。
else if (p == q)
continue restartFromHead;
else
// 正常迭代下一个结点。
p = q;
}
}
}
将首结点的指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集。
updateHead(Node h, Node p)
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
// h的next指向自己,从链表中断开
h.lazySetNext(h);
}
h.lazySetNext(h)
操作没有volatile
的语义,有可能对其他线程暂时不可见。 假设poll方法执行到((q = p.next) == null)
时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么(p == q)
的判断也可能失败,最终执行到 p = q
,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为这种情况造成的性能损失低于volatile造成的性能损失)。
public boolean remove(Object o) {
if (o != null) {
Node next, pred = null;
for (Node p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
// 判断当前是不是空结点。
if (item != null) {
// 如果结点数据与输入参数相等,说明这是一个需要移除的结点,否则应该跳到下一个结点;
if (!o.equals(item)) {
// 结点数据不相等,取出当前结点p的下一个结点,然后重新开始循环;
next = succ(p);
continue;
}
// 结点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
removed = p.casItem(item, null);
}
// 执行到这一行说明,p是空结点,(本来就是空,或在L5被清空);取出next结点;
next = succ(p);
// 由于p是空结点,尝试将p的前继和后继相连;
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}
first()
Node first() {
restartFromHead:
// 自旋
for (;;) {
// 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
for (Node h = head, p = h, q;;) {
// p中是否存在元素
boolean hasItem = (p.item != null);
// 若有元素或next为null则更新首结点
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
// 若结点中有元素则返回此结点,链表中存在包含元素的结点
// 若结点中没有元素则返回null,链表中没有实际数据。
return hasItem ? p : null;
}
// p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了,这种情况下,只能重新开始first。
else if (p == q)
continue restartFromHead;
else
// 正常迭代下一个结点。
p = q;
}
}
}
isEmpty()
public boolean isEmpty() {
// 通过first的返回值判断链表是否存在实际数据
return first() == null;
}
size()
public int size() {
int count = 0;
// 从实际的首结点开始遍历链表,计算链表中实际包含数据的结点数
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
// 链表最大值是Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
}
如果 p.next 已链接到 self,则返回首结点。否则返回下一个结点。
final Node succ(Node p) {
Node next = p.next;
return (p == next) ? head : next;
}
状态说明
在整个生命周期内,算法保持以下属性:
- 链表里面至少会有一个节点,数据字段为null的节点是空节点;
- 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
- TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
- 和队列断开的节点,next字段指向自身;
- 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。
参考
作者:longhuihu 链接:https://www.jianshu.com/p/ce6108e4b2c4