您当前的位置: 首页 > 

连LinkedBlockingQueue源码都没看过,我怎么敢给你offer?

发布时间:2020-04-19 23:05:46 ,浏览量:0

0 前言

LinkedBlockingQueue - 单链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素从队列尾部插入,从队首获取元素.是深入并发编程的基础数据结构.

1 继承体系

  • Queue 作为最基础的接口,定义了队列的三类基本操作:

BlockingQueue 基于 Queue 加了阻塞功能:

  • 一直阻塞
  • 阻塞一定时间,返回特殊值

remove 方法,BlockingQueue 类注释中定义的是抛异常,但 LinkedBlockingQueue 中 remove 方法实际是返回 false。

2 属性 2.1 链式存储
  • 节点的数据结构
  • next 为当前元素的下一个,为 null 表示当前节点是最后一个
  • 链表容量 默认大小为 Integer.MAX_VALUE !显然太大了!禁用!
  • 链表已有元素大小 使用了 AtomicInteger,线程安全
  • 链表头、尾
2.2 锁
  • take 时的锁
  • take 的条件队列,condition 可理解为基于 ASQ 建立的条件队列
  • put 时的锁
  • put 的条件队列 take 锁和 put 锁,是为保证队列操作时的线程安全,设计两种锁,是为了 take 和 put 两种操作可同时进行,互不影响。
2.3 迭代器
  • 实现了自己的迭代器
private class Itr implements Iterator<E> { /*
     * Basic weakly-consistent iterator.  At all times hold the next
     * item to hand out so that if hasNext() reports true, we will
     * still have it to return even if lost race with a take etc.
     */ private Node<E> current; private Node<E> lastRet; private E currentElement; Itr() { fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { fullyUnlock(); } } public boolean hasNext() { return current != null; } /**
     * Returns the next live successor of p, or null if no such.
     *
     * Unlike other traversal methods, iterators need to handle both:
     * - dequeued nodes (p.next == p)
     * - (possibly multiple) interior removed nodes (p.item == null)
     */ private Node<E> nextNode(Node<E> p) { for (;;) { Node<E> s = p.next; if (s == p) return head.next; if (s == null || s.item != null) return s; p = s; } } public E next() { fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; lastRet = current; current = nextNode(current); currentElement = (current == null) ? null : current.item; return x; } finally { fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); fullyLock(); try { Node<E> node = lastRet; lastRet = null; for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (p == node) { unlink(p, trail); break; } } } finally { fullyUnlock(); } } } 
3 构造方法 3.1 无参
  • 默认为 Integer 的最大值
3.2 有参
  • 指定容量大小.链表头尾相等,节点值(item)都是 null
  • 已有集合数据
public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { // 集合元素不能为 null if (e == null) throw new NullPointerException(); // capacity 代表链表的大小,在这为 Integer.MAX_VALUE // 若集合大小 > IntegerMAX_VALUE,直接抛异常. if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } 
4 新增

以 offer 方法为例,将元素E添加到队列的末尾

public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 如果“队列已满”,则返回false,表示插入失败。  final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; // 新建“节点e”  Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; // 获取“插入锁putLock”  putLock.lock(); try { // 再次对“队列是不是满”的进行判断。  // 若“队列未满”,则插入节点。  if (count.get() < capacity) { // 插入节点 enqueue(node); // 将“当前节点数量”+1,并返回“原始的数量”  c = count.getAndIncrement(); // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。  if (c + 1 < capacity) notFull.signal(); } } finally { // 释放“插入锁putLock”  putLock.unlock(); } // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程  if (c == 0) signalNotEmpty(); return c >= 0; } 
enqueue
  • 将node添加到队列末尾,并设置node为新的尾节点
signalNotEmpty
  • 发出等待信号。 仅能从put/offer调用(否则通常不锁定takeLock),唤醒notEmpty上的等待线程.

新增数据成功后,在适当时机,会唤起 put 的等待线程(队列不满时),或者 take 的等待线程(队列不为空时),这样保证队列一旦满足 put 或者 take 条件时,立马就能唤起阻塞线程,继续运行,保证了唤起的时机不被浪费。

5 取出 take

以take()为例,取出并返回队列的头。若队列为空,则一直等待。

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 获取“取出锁”,若当前线程是中断状态,则抛InterruptedException  takeLock.lockInterruptibly(); try { // 若“队列为空”,则一直等待。  while (count.get() == 0) { notEmpty.await(); } // 取元素  x = dequeue(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。  c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { // 释放“取出锁”  takeLock.unlock(); } // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。  if (c == capacity) signalNotFull(); return x; } 
dequeue
  • 删除队列的头节点,并将表头指向“原头节点的下一个节点”。
signalNotFull
  • 发出等待的信号。 仅能从take/poll调用.唤醒notFull上的等待线程.
关注
打赏
1688896170
查看更多评论

暂无认证

  • 0浏览

    0关注

    115984博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0690s