深入学习java源码之DelayQueue.poll()与DelayQueue.peek()
DelayQueue
是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于已有的PriorityBlockingQueue实现
DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计,例如:
a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
笨办法是,使用一个后台线程,遍历所有对象,挨个检查。 但对象数量过多时,存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,多小则存在效率问题。而且做不到按超时的时间顺序处理。
这场景,使用DelayQueue最适合了。 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。 该队列的头部是延迟期满后保存时间最长的Delayed元素(即最想优先处理的元素)。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。 即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使 null元素。
DelayQueue队列中保存的是实现了Delayed接口的实现类,里面必须实现getDelay()和compareTo()方法。前者用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取。 compareTo()方法用于进行队列内部的排序。compareTo 方法需提供与 getDelay 方法一致的排序。
可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。通过PriorityQueue,可以优先处理最紧急的元素,利用BlockingQueue,能防止不必要的不断轮询,提高了性能。在很多需要回收对象的场景都能用上。
DelayQueue也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue中的所有元素必须实现Delayed
接口:
/**
* 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
*
* 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
*/
public interface Delayed extends Comparable {
/**
* 返回与此对象相关的剩余有效时间,以给定的时间单位表示.
*/
long getDelay(TimeUnit unit);
}
可以看到,Delayed接口除了自身的getDelay
方法外,还实现了Comparable接口。getDelay方法用于返回对象的剩余有效时间,实现Comparable接口则是为了能够比较两个对象,以便排序。
也就是说,如果一个类实现了Delayed接口,当创建该类的对象并添加到DelayQueue中后,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。
另外,由于DelayQueue内部委托了PriorityBlockingQueue对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。
DelayQueue的特点简要概括如下:
- DelayQueue是无界阻塞队列;
- 队列中的元素必须实现Delayed接口,元素过期后才会从队列中取走;
延迟队列DelayQueue,take()该方法的主要功能是从优先队列(PriorityQueue)取出一个最应该执行的任务(最优值),如果该任务的预订执行时间未到,则需要wait这段时间差。反之,如果时间到了,则返回该任务。而offer()方法是将一个任务添加到该队列中。
内部的PriorityQueue并非在构造时创建,而是对象创建时生成:
ublic class DelayQueue extends AbstractQueue
implements BlockingQueue {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue q = new PriorityQueue();
/**
* leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
* 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
*/
private Thread leader = null;
/**
* 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
*/
private final Condition available = lock.newCondition();
//...
}
上述比较特殊的是leader
字段,我们之前已经说过,DelayQueue每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在available
这个条件队列上无限等待。
为了提升性能,DelayQueue并不会让所有出队线程都无限等待,而是用leader
保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程被唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在available
条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。这样,就避免了无效的等待,提升了性能。这其实是一种名为“Leader-Follower pattern”的多线程设计模式。
入队——put
put方法没有什么特别,由于是无界队列,所以也不会阻塞线程。
需要注意的是当首次入队元素时,需要唤醒一个出队线程,因为此时可能已有出队线程在空队列上等待了,如果不唤醒,会导致出队线程永远无法执行。
/**
* 入队一个指定元素e.
* 由于是无界队列, 所以该方法并不会阻塞线程.
*/
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e); // 调用PriorityQueue的offer方法
if (q.peek() == e) { // 如果入队元素在队首, 则唤醒一个出队线程
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
if (q.peek() == e) { // 如果入队元素在队首, 则唤醒一个出队线程
leader = null;
available.signal();
}
出队——take
整个take方法在一个自旋中完成,其实就分为两种情况:
1.队列为空
这种情况直接阻塞出队线程。(在available条件队列等待)
2.队列非空
队列非空时,还要看队首元素的状态(有效期),如果队首元素过期了,那直接出队就行了;如果队首元素未过期,就要看当前线程是否是第一个到达的出队线程(即判断leader
是否为空),如果不是,就无限等待,如果是,则限时等待。
需要注意,自旋结束后如果leader == null && q.peek() != null
,需要唤醒一个等待中的出队线程。leader == null && q.peek() != null
的含义就是——没有leader
线程但队列中存在元素。我们之前说了,leader线程作用之一就是用来唤醒其它无限等待的线程,所以必须要有这个判断。
/**
* 队首出队元素.
* 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (; ; ) {
E first = q.peek(); // 读取队首元素
if (first == null) // CASE1: 队列为空, 直接阻塞
available.await();
else { // CASE2: 队列非空
long delay = first.getDelay(NANOSECONDS);
if (delay 0)
return 1;
else if (this.seqno < x.seqno) // 剩余时间相同则比较序号
return -1;
else
return 1;
}
// 一般不会执行到此处,除非元素不是Data类型
long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
@Override
public String toString() {
return "Data{" +
"time=" + time +
", seqno=" + seqno +
"}, isValid=" + isValid();
}
private boolean isValid() {
return this.getDelay(TimeUnit.NANOSECONDS) > 0;
}
}
关于队列元素Data类,需要注意以下几点:
- 每个元素的
time
字段保存失效时间点)的纳秒形式(构造时指定,比如当前时间+60s); seqno
字段表示元素序号,每个元素唯一,仅用于失效时间点一致的元素之间的比较。getDelay
方法返回元素的剩余有效时间,可以根据入参的TimeUnit选择时间的表示形式(秒、微妙、纳秒等),一般选择纳秒以提高精度;compareTo
方法用于比较两个元素的大小,以便在队列中排序。由于DelayQueue基于优先级队列实现,所以内部是“堆”的形式,我们定义的规则是先失效的元素将先出队,所以先失效元素应该在堆顶,即compareTo方法返回结果 students = new DelayQueue(); Random random = new Random(); for (int i = 0; i < studentNumber; i++) { students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch)); } Thread teacherThread =new Thread(new Teacher(students)); students.put(new EndExam(students, 120,countDownLatch,teacherThread)); teacherThread.start(); countDownLatch.await(); System.out.println(" 考试时间到,全部交卷!"); } } class Student implements Runnable,Delayed{ private String name; private long workTime; private long submitTime; private boolean isForce = false; private CountDownLatch countDownLatch; public Student(){} public Student(String name,long workTime,CountDownLatch countDownLatch){ this.name = name; this.workTime = workTime; //提交时间 = 当前时间 + 作答时间 this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime(); this.countDownLatch = countDownLatch; } @Override public int compareTo(Delayed o) { // 按照作答时长正序排序(队头放的是你认为最先需要处理的元素,在这里体现为需要最先交卷,所以是正序) if(o == null || ! (o instanceof Student)) return 1; if(o == this) return 0; Student s = (Student)o; if (this.workTime > s.workTime) { return 1; }else if (this.workTime == s.workTime) { return 0; }else { return -1; } } @Override public long getDelay(TimeUnit unit) { // 提交时间 - 当前时间 用来判断延迟是否到期(即是否可以提交试卷,可以进行take或者poll) // 返回正数:延迟还有多少时间到期。负数:延迟已经在多长时间前到期。负数代表可以take或者poll return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public void run() { if (isForce) { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" ); }else { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟"); } countDownLatch.countDown(); } public boolean isForce() { return isForce; } public void setForce(boolean isForce) { this.isForce = isForce; } public String getName() { return name; } public void setName(String name) { this.name = name; } public long getWorkTime() { return workTime; } public void setWorkTime(long workTime) { this.workTime = workTime; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } } class EndExam extends Student{ private DelayQueue students; private CountDownLatch countDownLatch; private Thread teacherThread; public EndExam(DelayQueue students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) { super("强制收卷", workTime,countDownLatch); this.students = students; this.countDownLatch = countDownLatch; this.teacherThread = teacherThread; } @Override public void run() { teacherThread.interrupt(); Student tmpStudent; for (Iterator iterator2 = students.iterator(); iterator2.hasNext();) { tmpStudent = iterator2.next(); tmpStudent.setForce(true); System.out.println(tmpStudent.getName()+"==="+tmpStudent.getDelay(TimeUnit.NANOSECONDS)); tmpStudent.run(); } countDownLatch.countDown(); } } class Teacher implements Runnable{ private DelayQueue students; public Teacher(DelayQueue students){ this.students = students; } @Override public void run() { try { System.out.println(" test start"); while(!Thread.interrupted()){ Student s = students.take(); System.out.println(s.getName()+"==="+s.getDelay(TimeUnit.NANOSECONDS)); s.run(); } } catch (Exception e) { e.printStackTrace(); } } }场景二
向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。需要注意的是: 1、当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间 2、为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数 3、当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程
public class Cache { public ConcurrentHashMap map = new ConcurrentHashMap(); public DelayQueue queue = new DelayQueue(); public void put(K k,V v,long liveTime){ V v2 = map.put(k, v); DelayedItem tmpItem = new DelayedItem(k, liveTime); if (v2 != null) { queue.remove(tmpItem); } queue.put(tmpItem); } public Cache(){ Thread t = new Thread(){ @Override public void run(){ dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } public void dameonCheckOverdueKey(){ while (true) { DelayedItem delayedItem = queue.poll(); if (delayedItem != null) { map.remove(delayedItem.getT()); System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache"); } try { Thread.sleep(300); } catch (Exception e) { // TODO: handle exception } } } /** * TODO * @param args * 2014-1-11 上午11:30:36 * @author:孙振超 * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Random random = new Random(); int cacheNumber = 10; int liveTime = 0; Cache cache = new Cache(); for (int i = 0; i < cacheNumber; i++) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); if (random.nextInt(cacheNumber) > 7) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); } } Thread.sleep(3000); System.out.println(); } } class DelayedItem implements Delayed{ private T t; private long liveTime ; private long removeTime; public DelayedItem(T t,long liveTime){ this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem){ DelayedItem tmpDelayedItem = (DelayedItem)o; if (liveTime > tmpDelayedItem.liveTime ) { return 1; }else if (liveTime == tmpDelayedItem.liveTime) { return 0; }else { return -1; } } long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return diff > 0 ? 1:diff == 0? 0:-1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.nanoTime(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode(){ return t.hashCode(); } @Override public boolean equals(Object object){ if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ?true:false; } return false; } }
java源码
Modifier and TypeMethod and Descriptionboolean
add(E e)
将指定的元素插入到此延迟队列中。
void
clear()
从此延迟队列中原子地删除所有元素。
int
drainTo(Collection c)
如果此集合包含指定 集合中的所有元素,则返回true。
boolean
isEmpty()
如果此集合不包含元素,则返回 true 。
abstract Iterator
iterator()
返回包含在该集合中的元素的迭代器。
boolean
remove(Object o)
从该集合中删除指定元素的单个实例(如果存在)(可选操作)。
boolean
removeAll(Collection c)
删除指定集合中包含的所有此集合的元素(可选操作)。
boolean
retainAll(Collection c)
仅保留此集合中包含在指定集合中的元素(可选操作)。
abstract int
size()
返回此集合中的元素数。
Object[]
toArray()
返回一个包含此集合中所有元素的数组。
T[]
toArray(T[] a)
返回包含此集合中所有元素的数组; 返回的数组的运行时类型是指定数组的运行时类型。
String
toString()
返回此集合的字符串表示形式。
Modifier and TypeMethod and Descriptionpackage java.util; public abstract class AbstractCollection implements Collection { protected AbstractCollection() { } public abstract Iterator iterator(); public abstract int size(); public boolean isEmpty() { return size() == 0; } public boolean contains(Object o) { Iterator it = iterator(); if (o==null) { while (it.hasNext()) if (it.next()==null) return true; } else { while (it.hasNext()) if (o.equals(it.next())) return true; } return false; } public Object[] toArray() { // Estimate size of array; be prepared to see more or fewer elements Object[] r = new Object[size()]; Iterator it = iterator(); for (int i = 0; i < r.length; i++) { if (! it.hasNext()) // fewer elements than expected return Arrays.copyOf(r, i); r[i] = it.next(); } return it.hasNext() ? finishToArray(r, it) : r; } @SuppressWarnings("unchecked") public T[] toArray(T[] a) { // Estimate size of array; be prepared to see more or fewer elements int size = size(); T[] r = a.length >= size ? a : (T[])java.lang.reflect.Array .newInstance(a.getClass().getComponentType(), size); Iterator it = iterator(); for (int i = 0; i < r.length; i++) { if (! it.hasNext()) { // fewer elements than expected if (a == r) { r[i] = null; // null-terminate } else if (a.length < i) { return Arrays.copyOf(r, i); } else { System.arraycopy(r, 0, a, 0, i); if (a.length > i) { a[i] = null; } } return a; } r[i] = (T)it.next(); } // more elements than expected return it.hasNext() ? finishToArray(r, it) : r; } private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; @SuppressWarnings("unchecked") private static T[] finishToArray(T[] r, Iterator it) { int i = r.length; while (it.hasNext()) { int cap = r.length; if (i == cap) { int newCap = cap + (cap >> 1) + 1; // overflow-conscious code if (newCap - MAX_ARRAY_SIZE > 0) newCap = hugeCapacity(cap + 1); r = Arrays.copyOf(r, newCap); } r[i++] = (T)it.next(); } // trim if overallocated return (i == r.length) ? r : Arrays.copyOf(r, i); } private static int hugeCapacity(int minCapacity) { if (minCapacity < 0) // overflow throw new OutOfMemoryError ("Required array size too large"); return (minCapacity > MAX_ARRAY_SIZE) ? Integer.MAX_VALUE : MAX_ARRAY_SIZE; } public boolean add(E e) { throw new UnsupportedOperationException(); } public boolean remove(Object o) { Iterator it = iterator(); if (o==null) { while (it.hasNext()) { if (it.next()==null) { it.remove(); return true; } } } else { while (it.hasNext()) { if (o.equals(it.next())) { it.remove(); return true; } } } return false; } public boolean containsAll(Collection c) { for (Object e : c) if (!contains(e)) return false; return true; } public boolean addAll(Collection c) { Objects.requireNonNull(c); boolean modified = false; Iterator it = iterator(); while (it.hasNext()) { if (c.contains(it.next())) { it.remove(); modified = true; } } return modified; } public boolean retainAll(Collection c) { Objects.requireNonNull(c); boolean modified = false; Iterator it = iterator(); while (it.hasNext()) { if (!c.contains(it.next())) { it.remove(); modified = true; } } return modified; } public void clear() { Iterator it = iterator(); while (it.hasNext()) { it.next(); it.remove(); } } public String toString() { Iterator it = iterator(); if (! it.hasNext()) return "[]"; StringBuilder sb = new StringBuilder(); sb.append('['); for (;;) { E e = it.next(); sb.append(e == this ? "(this Collection)" : e); if (! it.hasNext()) return sb.append(']').toString(); sb.append(',').append(' '); } } }
boolean
add(E e)
将指定的元素插入到此队列中,如果可以立即执行此操作而不违反容量限制,
true
在成功后返回IllegalStateException
如果当前没有可用空间,则抛出IllegalStateException。boolean
contains(Object o)
如果此队列包含指定的元素,则返回
true
。int
drainTo(Collection
关注打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?