深入学习java源码之LinkedBlockingQueue.offer()与LinkedBlockingQueue.fullyLock()
Java集合类库将集合的接口与实现分离。同样的接口,可以有不同的实现。
Java集合类的基本接口是Collection接口。而Collection接口必须继承java.lang.Iterable接口。
以下图表示集合框架的接口,java.lang以及java.util两个包里的。其他部分可以从左向右看,比如Collection的Subinterfaces有List,Set以及Queue等。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。
LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。
LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下:
put元素原理
基本过程:
1.判断元素是否为null,为null抛出异常
2.加锁(可中断锁)
3.判断队列长度是否到达容量,如果到达一直等待
4.如果没有队满,enqueue()在队尾加入元素
5.队列长度加1,此时如果队列还没有满,调用signal唤醒其他堵塞队列
if (e == null) throw new NullPointerException();
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
take元素原理
基本过程:
1.加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁
2.判断队列是否为空,如果为空就一直等待
3.通过dequeue方法取得数据
3.取走元素后队列是否为空,如果不为空唤醒其他等待中的队列
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
总结 LinkedBlockingQueue可以与ArrayBlockingQueue做一个比较。 相同点有如下2点: 1. 不允许元素为null 2. 线程安全的队列
不同点有如下几点: 1. ArrayBlockingQueue底层基于定长的数组,所以容量限制了;LinkedBlockingQueue底层基于链表实现队列,所以容量可选,如果不设置,那么容量是int的最大值 2. ArrayBlockingQueue内部维持一把锁和两个条件,同一时刻只能有一个线程队列的一端操作;LinkedBlockingQueue内部维持两把锁和两个条件,同一时刻可以有两个线程在队列的两端操作,但同一时刻只能有一个线程在一端操作。 3. LinkedBlockingQueue的remove()类似方法时,由于需要对整个队列链表实现遍历,所以需要获取两把锁,对两端加锁。
java源码
Modifier and TypeMethod and Descriptionvoid
clear()
从这个队列中原子地删除所有的元素。
boolean
contains(Object o)
如果此队列包含指定的元素,则返回 true
。
int
drainTo(Collection c)
如果此集合包含指定 集合中的所有元素,则返回true。
boolean
isEmpty()
如果此集合不包含元素,则返回 true 。
abstract Iterator
iterator()
返回包含在该集合中的元素的迭代器。
boolean
remove(Object o)
从该集合中删除指定元素的单个实例(如果存在)(可选操作)。
boolean
removeAll(Collection c)
删除指定集合中包含的所有此集合的元素(可选操作)。
boolean
retainAll(Collection c)
仅保留此集合中包含在指定集合中的元素(可选操作)。
abstract int
size()
返回此集合中的元素数。
Object[]
toArray()
返回一个包含此集合中所有元素的数组。
T[]
toArray(T[] a)
返回包含此集合中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型。
String
toString()
返回此集合的字符串表示形式。
package java.util;
public abstract class AbstractCollection implements Collection {
protected AbstractCollection() {
}
public abstract Iterator iterator();
public abstract int size();
public boolean isEmpty() {
return size() == 0;
}
public boolean contains(Object o) {
Iterator it = iterator();
if (o==null) {
while (it.hasNext())
if (it.next()==null)
return true;
} else {
while (it.hasNext())
if (o.equals(it.next()))
return true;
}
return false;
}
public Object[] toArray() {
// Estimate size of array; be prepared to see more or fewer elements
Object[] r = new Object[size()];
Iterator it = iterator();
for (int i = 0; i < r.length; i++) {
if (! it.hasNext()) // fewer elements than expected
return Arrays.copyOf(r, i);
r[i] = it.next();
}
return it.hasNext() ? finishToArray(r, it) : r;
}
@SuppressWarnings("unchecked")
public T[] toArray(T[] a) {
// Estimate size of array; be prepared to see more or fewer elements
int size = size();
T[] r = a.length >= size ? a :
(T[])java.lang.reflect.Array
.newInstance(a.getClass().getComponentType(), size);
Iterator it = iterator();
for (int i = 0; i < r.length; i++) {
if (! it.hasNext()) { // fewer elements than expected
if (a == r) {
r[i] = null; // null-terminate
} else if (a.length < i) {
return Arrays.copyOf(r, i);
} else {
System.arraycopy(r, 0, a, 0, i);
if (a.length > i) {
a[i] = null;
}
}
return a;
}
r[i] = (T)it.next();
}
// more elements than expected
return it.hasNext() ? finishToArray(r, it) : r;
}
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
@SuppressWarnings("unchecked")
private static T[] finishToArray(T[] r, Iterator it) {
int i = r.length;
while (it.hasNext()) {
int cap = r.length;
if (i == cap) {
int newCap = cap + (cap >> 1) + 1;
// overflow-conscious code
if (newCap - MAX_ARRAY_SIZE > 0)
newCap = hugeCapacity(cap + 1);
r = Arrays.copyOf(r, newCap);
}
r[i++] = (T)it.next();
}
// trim if overallocated
return (i == r.length) ? r : Arrays.copyOf(r, i);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError
("Required array size too large");
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
public boolean add(E e) {
throw new UnsupportedOperationException();
}
public boolean remove(Object o) {
Iterator it = iterator();
if (o==null) {
while (it.hasNext()) {
if (it.next()==null) {
it.remove();
return true;
}
}
} else {
while (it.hasNext()) {
if (o.equals(it.next())) {
it.remove();
return true;
}
}
}
return false;
}
public boolean containsAll(Collection c) {
for (Object e : c)
if (!contains(e))
return false;
return true;
}
public boolean addAll(Collection c) {
Objects.requireNonNull(c);
boolean modified = false;
Iterator it = iterator();
while (it.hasNext()) {
if (c.contains(it.next())) {
it.remove();
modified = true;
}
}
return modified;
}
public boolean retainAll(Collection c) {
Objects.requireNonNull(c);
boolean modified = false;
Iterator it = iterator();
while (it.hasNext()) {
if (!c.contains(it.next())) {
it.remove();
modified = true;
}
}
return modified;
}
public void clear() {
Iterator it = iterator();
while (it.hasNext()) {
it.next();
it.remove();
}
}
public String toString() {
Iterator it = iterator();
if (! it.hasNext())
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}
}
Modifier and TypeMethod and Description
boolean
add(E e)
将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true
在成功后返回 IllegalStateException
如果当前没有可用空间,则抛出IllegalStateException。
boolean
contains(Object o)
如果此队列包含指定的元素,则返回 true
。
int
drainTo(Collection
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?