您当前的位置: 首页 >  Java

wespten

暂无认证

  • 1浏览

    0关注

    899博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

深入学习java源码之LinkedBlockingQueue.offer()与LinkedBlockingQueue.fullyLock()

wespten 发布时间:2019-01-31 16:13:02 ,浏览量:1

深入学习java源码之LinkedBlockingQueue.offer()与LinkedBlockingQueue.fullyLock()

Java集合类库将集合的接口与实现分离。同样的接口,可以有不同的实现。

Java集合类的基本接口是Collection接口。而Collection接口必须继承java.lang.Iterable接口。

以下图表示集合框架的接口,java.lang以及java.util两个包里的。其他部分可以从左向右看,比如Collection的Subinterfaces有List,Set以及Queue等。

LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下: 

put元素原理

基本过程:

1.判断元素是否为null,为null抛出异常

2.加锁(可中断锁)

3.判断队列长度是否到达容量,如果到达一直等待

4.如果没有队满,enqueue()在队尾加入元素

5.队列长度加1,此时如果队列还没有满,调用signal唤醒其他堵塞队列

  if (e == null) throw new NullPointerException();
        
         int c = -1;
         Node node = new Node(e);
         final ReentrantLock putLock = this.putLock;
         final AtomicInteger count = this.count;
         putLock.lockInterruptibly();
         try {
             while (count.get() == capacity) {
                 notFull.await();
             }
             enqueue(node);
             c = count.getAndIncrement();
             if (c + 1 < capacity)
                 notFull.signal();
         } finally {
             putLock.unlock();
         }

take元素原理

 基本过程:

1.加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁

2.判断队列是否为空,如果为空就一直等待

3.通过dequeue方法取得数据

3.取走元素后队列是否为空,如果不为空唤醒其他等待中的队列

 public E take() throws InterruptedException {
         E x;
         int c = -1;
         final AtomicInteger count = this.count;
         final ReentrantLock takeLock = this.takeLock;
         takeLock.lockInterruptibly();
         try {
             while (count.get() == 0) {
                 notEmpty.await();
             }
             x = dequeue();
             c = count.getAndDecrement();
             if (c > 1)
                 notEmpty.signal();
         } finally {
             takeLock.unlock();
         }
         if (c == capacity)
             signalNotFull();
         return x;
     }

总结 LinkedBlockingQueue可以与ArrayBlockingQueue做一个比较。  相同点有如下2点:  1. 不允许元素为null  2. 线程安全的队列

不同点有如下几点:  1. ArrayBlockingQueue底层基于定长的数组,所以容量限制了;LinkedBlockingQueue底层基于链表实现队列,所以容量可选,如果不设置,那么容量是int的最大值  2. ArrayBlockingQueue内部维持一把锁和两个条件,同一时刻只能有一个线程队列的一端操作;LinkedBlockingQueue内部维持两把锁和两个条件,同一时刻可以有两个线程在队列的两端操作,但同一时刻只能有一个线程在一端操作。  3. LinkedBlockingQueue的remove()类似方法时,由于需要对整个队列链表实现遍历,所以需要获取两把锁,对两端加锁。

java源码

Modifier and TypeMethod and Descriptionvoidclear()

从这个队列中原子地删除所有的元素。

booleancontains(Object o)

如果此队列包含指定的元素,则返回 true

intdrainTo(Collection c)

如果此集合包含指定 集合中的所有元素,则返回true。

booleanisEmpty()

如果此集合不包含元素,则返回 true 。

abstract Iteratoriterator()

返回包含在该集合中的元素的迭代器。

booleanremove(Object o)

从该集合中删除指定元素的单个实例(如果存在)(可选操作)。

booleanremoveAll(Collection c)

删除指定集合中包含的所有此集合的元素(可选操作)。

booleanretainAll(Collection c)

仅保留此集合中包含在指定集合中的元素(可选操作)。

abstract intsize()

返回此集合中的元素数。

Object[]toArray()

返回一个包含此集合中所有元素的数组。

 T[]toArray(T[] a)

返回包含此集合中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型。

StringtoString()

返回此集合的字符串表示形式。

package java.util;

public abstract class AbstractCollection implements Collection {

    protected AbstractCollection() {
    }

    public abstract Iterator iterator();

    public abstract int size();

    public boolean isEmpty() {
        return size() == 0;
    }

    public boolean contains(Object o) {
        Iterator it = iterator();
        if (o==null) {
            while (it.hasNext())
                if (it.next()==null)
                    return true;
        } else {
            while (it.hasNext())
                if (o.equals(it.next()))
                    return true;
        }
        return false;
    }

    public Object[] toArray() {
        // Estimate size of array; be prepared to see more or fewer elements
        Object[] r = new Object[size()];
        Iterator it = iterator();
        for (int i = 0; i < r.length; i++) {
            if (! it.hasNext()) // fewer elements than expected
                return Arrays.copyOf(r, i);
            r[i] = it.next();
        }
        return it.hasNext() ? finishToArray(r, it) : r;
    }
	
    @SuppressWarnings("unchecked")
    public  T[] toArray(T[] a) {
        // Estimate size of array; be prepared to see more or fewer elements
        int size = size();
        T[] r = a.length >= size ? a :
                  (T[])java.lang.reflect.Array
                  .newInstance(a.getClass().getComponentType(), size);
        Iterator it = iterator();

        for (int i = 0; i < r.length; i++) {
            if (! it.hasNext()) { // fewer elements than expected
                if (a == r) {
                    r[i] = null; // null-terminate
                } else if (a.length < i) {
                    return Arrays.copyOf(r, i);
                } else {
                    System.arraycopy(r, 0, a, 0, i);
                    if (a.length > i) {
                        a[i] = null;
                    }
                }
                return a;
            }
            r[i] = (T)it.next();
        }
        // more elements than expected
        return it.hasNext() ? finishToArray(r, it) : r;
    }	
	
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;	
	
    @SuppressWarnings("unchecked")
    private static  T[] finishToArray(T[] r, Iterator it) {
        int i = r.length;
        while (it.hasNext()) {
            int cap = r.length;
            if (i == cap) {
                int newCap = cap + (cap >> 1) + 1;
                // overflow-conscious code
                if (newCap - MAX_ARRAY_SIZE > 0)
                    newCap = hugeCapacity(cap + 1);
                r = Arrays.copyOf(r, newCap);
            }
            r[i++] = (T)it.next();
        }
        // trim if overallocated
        return (i == r.length) ? r : Arrays.copyOf(r, i);
    }

    private static int hugeCapacity(int minCapacity) {
        if (minCapacity < 0) // overflow
            throw new OutOfMemoryError
                ("Required array size too large");
        return (minCapacity > MAX_ARRAY_SIZE) ?
            Integer.MAX_VALUE :
            MAX_ARRAY_SIZE;
    }	
	
    public boolean add(E e) {
        throw new UnsupportedOperationException();
    }	
	
    public boolean remove(Object o) {
        Iterator it = iterator();
        if (o==null) {
            while (it.hasNext()) {
                if (it.next()==null) {
                    it.remove();
                    return true;
                }
            }
        } else {
            while (it.hasNext()) {
                if (o.equals(it.next())) {
                    it.remove();
                    return true;
                }
            }
        }
        return false;
    }	
	
    public boolean containsAll(Collection c) {
        for (Object e : c)
            if (!contains(e))
                return false;
        return true;
    }	
	
    public boolean addAll(Collection c) {
        Objects.requireNonNull(c);
        boolean modified = false;
        Iterator it = iterator();
        while (it.hasNext()) {
            if (c.contains(it.next())) {
                it.remove();
                modified = true;
            }
        }
        return modified;
    }	
	
    public boolean retainAll(Collection c) {
        Objects.requireNonNull(c);
        boolean modified = false;
        Iterator it = iterator();
        while (it.hasNext()) {
            if (!c.contains(it.next())) {
                it.remove();
                modified = true;
            }
        }
        return modified;
    }
	
    public void clear() {
        Iterator it = iterator();
        while (it.hasNext()) {
            it.next();
            it.remove();
        }
    }	
	
    public String toString() {
        Iterator it = iterator();
        if (! it.hasNext())
            return "[]";

        StringBuilder sb = new StringBuilder();
        sb.append('[');
        for (;;) {
            E e = it.next();
            sb.append(e == this ? "(this Collection)" : e);
            if (! it.hasNext())
                return sb.append(']').toString();
            sb.append(',').append(' ');
        }
    }

}	

 

Modifier and TypeMethod and Descriptionbooleanadd(E e)

将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true在成功后返回 IllegalStateException如果当前没有可用空间,则抛出IllegalStateException。

booleancontains(Object o)

如果此队列包含指定的元素,则返回 true

intdrainTo(Collection
关注
打赏
1665965058
查看更多评论
0.0548s