您当前的位置: 首页 >  Java

wespten

暂无认证

  • 1浏览

    0关注

    899博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

深入学习java源码之DelayQueue.poll()与DelayQueue.peek()

wespten 发布时间:2019-02-01 18:46:37 ,浏览量:1

深入学习java源码之DelayQueue.poll()与DelayQueue.peek()

DelayQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于已有的PriorityBlockingQueue实现

DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计,例如:

a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。

b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。

c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

笨办法是,使用一个后台线程,遍历所有对象,挨个检查。 但对象数量过多时,存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。

这场景,使用DelayQueue最适合了。 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。 该队列的头部是延迟期满后保存时间最长的Delayed元素(即最想优先处理的元素)。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。 即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使 null元素。

DelayQueue队列中保存的是实现了Delayed接口的实现类,里面必须实现getDelay()和compareTo()方法。前者用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取。 compareTo()方法用于进行队列内部的排序。compareTo 方法需提供与 getDelay 方法一致的排序。

可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。通过PriorityQueue,可以优先处理最紧急的元素,利用BlockingQueue,能防止不必要的不断轮询,提高了性能。在很多需要回收对象的场景都能用上。

 

DelayQueue也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue中的所有元素必须实现Delayed接口:

/**
 * 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
 * 

* 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable { /** * 返回与此对象相关的剩余有效时间,以给定的时间单位表示. */ long getDelay(TimeUnit unit); }

可以看到,Delayed接口除了自身的getDelay方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了能够比较两个对象,以便排序。

也就是说,如果一个类实现了Delayed接口,当创建该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。

另外,由于DelayQueue内部委托了PriorityBlockingQueue对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。

DelayQueue的特点简要概括如下:

  1. DelayQueue是无界阻塞队列;
  2. 队列中的元素必须实现Delayed接口,元素过期后才会从队列中取走;

延迟队列DelayQueue,take()该方法的主要功能是从优先队列(PriorityQueue)取出一个最应该执行的任务(最优值),如果该任务的预订执行时间未到,则需要wait这段时间差。反之,如果时间到了,则返回该任务。而offer()方法是将一个任务添加到该队列中。

内部的PriorityQueue并非在构造时创建,而是对象创建时生成:

ublic class DelayQueue extends AbstractQueue
    implements BlockingQueue {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue q = new PriorityQueue();

    /**
     * leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
     * 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
     */
    private Thread leader = null;

    /**
     * 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
     */
    private final Condition available = lock.newCondition();

    //...

}

上述比较特殊的是leader字段,我们之前已经说过,DelayQueue每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在available这个条件队列上无限等待。

为了提升性能,DelayQueue并不会让所有出队线程都无限等待,而是用leader保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在available条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。这样,就避免了无效的等待,提升了性能。这其实是一种名为“Leader-Follower pattern”的多线程设计模式。

入队——put

put方法没有什么特别,由于是无界队列,所以也不会阻塞线程。

需要注意的是当首次入队元素时,需要唤醒一个出队线程,因为此时可能已有出队线程在空队列上等待了,如果不唤醒,会导致出队线程永远无法执行。

/**
 * 入队一个指定元素e.
 * 由于是无界队列, 所以该方法并不会阻塞线程.
 */
public void put(E e) {
    offer(e);
}

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);             // 调用PriorityQueue的offer方法
        if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

if (q.peek() == e) {    // 如果入队元素在队首, 则唤醒一个出队线程
    leader = null;
    available.signal();
}

出队——take

整个take方法在一个自旋中完成,其实就分为两种情况:

1.队列为空

这种情况直接阻塞出队线程。(在available条件队列等待)

2.队列非空

队列非空时,还要看队首元素的状态(有效期),如果队首元素过期了,那直接出队就行了;如果队首元素未过期,就要看当前线程是否是第一个到达的出队线程(即判断leader是否为空),如果不是,就无限等待,如果是,则限时等待。

需要注意,自旋结束后如果leader == null && q.peek() != null,需要唤醒一个等待中的出队线程。leader == null && q.peek() != null的含义就是——没有leader线程但队列中存在元素。我们之前说了,leader线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。

/**
 * 队首出队元素.
 * 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (; ; ) {
            E first = q.peek();     // 读取队首元素
            if (first == null)      // CASE1: 队列为空, 直接阻塞
                available.await();
            else {                  // CASE2: 队列非空
                long delay = first.getDelay(NANOSECONDS);
                if (delay  0)
                return 1;

            else if (this.seqno < x.seqno)    // 剩余时间相同则比较序号
                return -1;
            else
                return 1;
        }

        // 一般不会执行到此处,除非元素不是Data类型
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    @Override
    public String toString() {
        return "Data{" +
            "time=" + time +
            ", seqno=" + seqno +
            "}, isValid=" + isValid();
    }

    private boolean isValid() {
        return this.getDelay(TimeUnit.NANOSECONDS) > 0;
    }
}

关于队列元素Data类,需要注意以下几点:

  1. 每个元素的time字段保存失效时间点)的纳秒形式(构造时指定,比如当前时间+60s);
  2. seqno字段表示元素序号,每个元素唯一,仅用于失效时间点一致的元素之间的比较。
  3. getDelay方法返回元素的剩余有效时间,可以根据入参的TimeUnit选择时间的表示形式(秒、微妙、纳秒等),一般选择纳秒以提高精度;
  4. compareTo方法用于比较两个元素的大小,以便在队列中排序。由于DelayQueue基于优先级队列实现,所以内部是“堆”的形式,我们定义的规则是先失效的元素将先出队,所以先失效元素应该在堆顶,即compareTo方法返回结果 students = new DelayQueue(); Random random = new Random(); for (int i = 0; i < studentNumber; i++) { students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch)); } Thread teacherThread =new Thread(new Teacher(students)); students.put(new EndExam(students, 120,countDownLatch,teacherThread)); teacherThread.start(); countDownLatch.await(); System.out.println(" 考试时间到,全部交卷!"); } } class Student implements Runnable,Delayed{ private String name; private long workTime; private long submitTime; private boolean isForce = false; private CountDownLatch countDownLatch; public Student(){} public Student(String name,long workTime,CountDownLatch countDownLatch){ this.name = name; this.workTime = workTime; //提交时间 = 当前时间 + 作答时间 this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime(); this.countDownLatch = countDownLatch; } @Override public int compareTo(Delayed o) { // 按照作答时长正序排序(队头放的是你认为最先需要处理的元素,在这里体现为需要最先交卷,所以是正序) if(o == null || ! (o instanceof Student)) return 1; if(o == this) return 0; Student s = (Student)o; if (this.workTime > s.workTime) { return 1; }else if (this.workTime == s.workTime) { return 0; }else { return -1; } } @Override public long getDelay(TimeUnit unit) { // 提交时间 - 当前时间 用来判断延迟是否到期(即是否可以提交试卷,可以进行take或者poll) // 返回正数:延迟还有多少时间到期。负数:延迟已经在多长时间前到期。负数代表可以take或者poll return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public void run() { if (isForce) { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" ); }else { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟"); } countDownLatch.countDown(); } public boolean isForce() { return isForce; } public void setForce(boolean isForce) { this.isForce = isForce; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getWorkTime() { return workTime; } public void setWorkTime(long workTime) { this.workTime = workTime; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } } class EndExam extends Student{ private DelayQueue students; private CountDownLatch countDownLatch; private Thread teacherThread; public EndExam(DelayQueue students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) { super("强制收卷", workTime,countDownLatch); this.students = students; this.countDownLatch = countDownLatch; this.teacherThread = teacherThread; } @Override public void run() { teacherThread.interrupt(); Student tmpStudent; for (Iterator iterator2 = students.iterator(); iterator2.hasNext();) { tmpStudent = iterator2.next(); tmpStudent.setForce(true); System.out.println(tmpStudent.getName()+"==="+tmpStudent.getDelay(TimeUnit.NANOSECONDS)); tmpStudent.run(); } countDownLatch.countDown(); } } class Teacher implements Runnable{ private DelayQueue students; public Teacher(DelayQueue students){ this.students = students; } @Override public void run() { try { System.out.println(" test start"); while(!Thread.interrupted()){ Student s = students.take(); System.out.println(s.getName()+"==="+s.getDelay(TimeUnit.NANOSECONDS)); s.run(); } } catch (Exception e) { e.printStackTrace(); } } }

    场景二

    向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。需要注意的是: 1、当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间 2、为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数 3、当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程

    public class Cache {
    
        public ConcurrentHashMap map = new ConcurrentHashMap();
        public DelayQueue queue = new DelayQueue();
        
        
        public void put(K k,V v,long liveTime){
            V v2 = map.put(k, v);
            DelayedItem tmpItem = new DelayedItem(k, liveTime);
            if (v2 != null) {
                queue.remove(tmpItem);
            }
            queue.put(tmpItem);
        }
        
        public Cache(){
            Thread t = new Thread(){
                @Override
                public void run(){
                    dameonCheckOverdueKey();
                }
            };
            t.setDaemon(true);
            t.start();
        }
        
        public void dameonCheckOverdueKey(){
            while (true) {
                DelayedItem delayedItem = queue.poll();
                if (delayedItem != null) {
                    map.remove(delayedItem.getT());
                    System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache");
                }
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                    // TODO: handle exception
                }
            }
        }
        
        /**
         * TODO
         * @param args
         * 2014-1-11 上午11:30:36
         * @author:孙振超
         * @throws InterruptedException 
         */
        public static void main(String[] args) throws InterruptedException {
            Random random = new Random();
            int cacheNumber = 10;
            int liveTime = 0;
            Cache cache = new Cache();
            
            for (int i = 0; i < cacheNumber; i++) {
                liveTime = random.nextInt(3000);
                System.out.println(i+"  "+liveTime);
                cache.put(i+"", i, random.nextInt(liveTime));
                if (random.nextInt(cacheNumber) > 7) {
                    liveTime = random.nextInt(3000);
                    System.out.println(i+"  "+liveTime);
                    cache.put(i+"", i, random.nextInt(liveTime));
                }
            }
    
            Thread.sleep(3000);
            System.out.println();
        }
    
    }
    
    class DelayedItem implements Delayed{
    
        private T t;
        private long liveTime ;
        private long removeTime;
        
        public DelayedItem(T t,long liveTime){
            this.setT(t);
            this.liveTime = liveTime;
            this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime();
        }
        
        @Override
        public int compareTo(Delayed o) {
            if (o == null) return 1;
            if (o == this) return  0;
            if (o instanceof DelayedItem){
                DelayedItem tmpDelayedItem = (DelayedItem)o;
                if (liveTime > tmpDelayedItem.liveTime ) {
                    return 1;
                }else if (liveTime == tmpDelayedItem.liveTime) {
                    return 0;
                }else {
                    return -1;
                }
            }
            long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            return diff > 0 ? 1:diff == 0? 0:-1;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(removeTime - System.nanoTime(), unit);
        }
    
        public T getT() {
            return t;
        }
    
        public void setT(T t) {
            this.t = t;
        }
        @Override
        public int hashCode(){
            return t.hashCode();
        }
        
        @Override
        public boolean equals(Object object){
            if (object instanceof DelayedItem) {
                return object.hashCode() == hashCode() ?true:false;
            }
            return false;
        }
        
    }

     

    java源码

    Modifier and TypeMethod and Descriptionbooleanadd(E e)

    将指定的元素插入到此延迟队列中。

    voidclear()

    从此延迟队列中原子地删除所有元素。

    intdrainTo(Collection c)

    如果此集合包含指定 集合中的所有元素,则返回true。

    booleanisEmpty()

    如果此集合不包含元素,则返回 true 。

    abstract Iteratoriterator()

    返回包含在该集合中的元素的迭代器。

    booleanremove(Object o)

    从该集合中删除指定元素的单个实例(如果存在)(可选操作)。

    booleanremoveAll(Collection c)

    删除指定集合中包含的所有此集合的元素(可选操作)。

    booleanretainAll(Collection c)

    仅保留此集合中包含在指定集合中的元素(可选操作)。

    abstract intsize()

    返回此集合中的元素数。

    Object[]toArray()

    返回一个包含此集合中所有元素的数组。

     T[]toArray(T[] a)

    返回包含此集合中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型。

    StringtoString()

    返回此集合的字符串表示形式。

    package java.util;
    
    public abstract class AbstractCollection implements Collection {
    
        protected AbstractCollection() {
        }
    
        public abstract Iterator iterator();
    
        public abstract int size();
    
        public boolean isEmpty() {
            return size() == 0;
        }
    
        public boolean contains(Object o) {
            Iterator it = iterator();
            if (o==null) {
                while (it.hasNext())
                    if (it.next()==null)
                        return true;
            } else {
                while (it.hasNext())
                    if (o.equals(it.next()))
                        return true;
            }
            return false;
        }
    
        public Object[] toArray() {
            // Estimate size of array; be prepared to see more or fewer elements
            Object[] r = new Object[size()];
            Iterator it = iterator();
            for (int i = 0; i < r.length; i++) {
                if (! it.hasNext()) // fewer elements than expected
                    return Arrays.copyOf(r, i);
                r[i] = it.next();
            }
            return it.hasNext() ? finishToArray(r, it) : r;
        }
    	
        @SuppressWarnings("unchecked")
        public  T[] toArray(T[] a) {
            // Estimate size of array; be prepared to see more or fewer elements
            int size = size();
            T[] r = a.length >= size ? a :
                      (T[])java.lang.reflect.Array
                      .newInstance(a.getClass().getComponentType(), size);
            Iterator it = iterator();
    
            for (int i = 0; i < r.length; i++) {
                if (! it.hasNext()) { // fewer elements than expected
                    if (a == r) {
                        r[i] = null; // null-terminate
                    } else if (a.length < i) {
                        return Arrays.copyOf(r, i);
                    } else {
                        System.arraycopy(r, 0, a, 0, i);
                        if (a.length > i) {
                            a[i] = null;
                        }
                    }
                    return a;
                }
                r[i] = (T)it.next();
            }
            // more elements than expected
            return it.hasNext() ? finishToArray(r, it) : r;
        }	
    	
        private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;	
    	
        @SuppressWarnings("unchecked")
        private static  T[] finishToArray(T[] r, Iterator it) {
            int i = r.length;
            while (it.hasNext()) {
                int cap = r.length;
                if (i == cap) {
                    int newCap = cap + (cap >> 1) + 1;
                    // overflow-conscious code
                    if (newCap - MAX_ARRAY_SIZE > 0)
                        newCap = hugeCapacity(cap + 1);
                    r = Arrays.copyOf(r, newCap);
                }
                r[i++] = (T)it.next();
            }
            // trim if overallocated
            return (i == r.length) ? r : Arrays.copyOf(r, i);
        }
    
        private static int hugeCapacity(int minCapacity) {
            if (minCapacity < 0) // overflow
                throw new OutOfMemoryError
                    ("Required array size too large");
            return (minCapacity > MAX_ARRAY_SIZE) ?
                Integer.MAX_VALUE :
                MAX_ARRAY_SIZE;
        }	
    	
        public boolean add(E e) {
            throw new UnsupportedOperationException();
        }	
    	
        public boolean remove(Object o) {
            Iterator it = iterator();
            if (o==null) {
                while (it.hasNext()) {
                    if (it.next()==null) {
                        it.remove();
                        return true;
                    }
                }
            } else {
                while (it.hasNext()) {
                    if (o.equals(it.next())) {
                        it.remove();
                        return true;
                    }
                }
            }
            return false;
        }	
    	
        public boolean containsAll(Collection c) {
            for (Object e : c)
                if (!contains(e))
                    return false;
            return true;
        }	
    	
        public boolean addAll(Collection c) {
            Objects.requireNonNull(c);
            boolean modified = false;
            Iterator it = iterator();
            while (it.hasNext()) {
                if (c.contains(it.next())) {
                    it.remove();
                    modified = true;
                }
            }
            return modified;
        }	
    	
        public boolean retainAll(Collection c) {
            Objects.requireNonNull(c);
            boolean modified = false;
            Iterator it = iterator();
            while (it.hasNext()) {
                if (!c.contains(it.next())) {
                    it.remove();
                    modified = true;
                }
            }
            return modified;
        }
    	
        public void clear() {
            Iterator it = iterator();
            while (it.hasNext()) {
                it.next();
                it.remove();
            }
        }	
    	
        public String toString() {
            Iterator it = iterator();
            if (! it.hasNext())
                return "[]";
    
            StringBuilder sb = new StringBuilder();
            sb.append('[');
            for (;;) {
                E e = it.next();
                sb.append(e == this ? "(this Collection)" : e);
                if (! it.hasNext())
                    return sb.append(']').toString();
                sb.append(',').append(' ');
            }
        }
    
    }	
    Modifier and TypeMethod and Descriptionbooleanadd(E e)

    将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制, true在成功后返回 IllegalStateException如果当前没有可用空间,则抛出IllegalStateException。

    booleancontains(Object o)

    如果此队列包含指定的元素,则返回 true

    intdrainTo(Collection
关注
打赏
1665965058
查看更多评论
0.0441s