在许多业务场景中,我们都会碰到延迟任务,定时任务这种需求。特别的,在网络连接的场景中,常常会出现一些超时控制。由于服务端的连接数量很大,这些超时任务的数量往往也是很庞大的。实现对大量任务的超时管理并不是一个容易的事情。
在本篇Chat中,我们会分析实现延迟任务的几种常见算法和数据结构。并且引入介绍一种更高性能的支撑延迟任务场景的数据结构:时间轮。并且最后以 Netty 中的代码实现进行分析,完成从利润到代码的落地。
心跳与超时:高并发高性能的时间轮超时器@
- 心跳与超时:高并发高性能的时间轮超时器
- 引言
- JDK 原生提供的超时任务支持
- java.util.Timer
- ScheduledThreadPoolExecutor
- 更高效的数据结构
- 基本原理
- 支撑更多超过范围的延迟时间
- 方案一:不同轮次的延迟任务共存相同的延迟队列
- 方案二:多层次时间轮
- Netty 的时间轮实现
- 接口定义
- 构建循环数组
- 新增延迟任务
- 工作线程workerThread
- 线程启动与准备工作
- 驱动指针和任务触发
- 时间轮停止
- 思考总结
在许多业务场景中,我们都会碰到延迟任务,定时任务这种需求。特别的,在网络连接的场景中,常常会出现一些超时控制。由于服务端的连接数量很大,这些超时任务的数量往往也是很庞大的。实现对大量任务的超时管理并不是一个容易的事情。
本章我们将介绍几种用于实现超时任务的数据结构,并且最后分析 Netty 在超时任务上采取的结构和代码。
欢迎加入技术交流群186233599讨论交流,也欢迎关注笔者公众号:风火说。
JDK 原生提供的超时任务支持 java.util.TimerJDK 在 1.3 的时候引入了Timer
数据结构用于实现定时任务。Timer
的实现思路比较简单,其内部有两个主要属性:
TaskQueue
:定时任务抽象类TimeTask
的列表。TimerThread
:用于执行定时任务的线程。
Timer
结构还定义了一个抽象类TimerTask
并且继承了Runnable
接口。业务系统实现了这个抽象类的run
方法用于提供具体的延时任务逻辑。
TaskQueue
内部采用大顶堆的方式,依据任务的触发时间进行排序。而TimerThread
则以死循环的方式从TaskQueue
获取队列头,等待队列头的任务的超时时间到达后触发该任务,并且将任务从队列中移除。
Timer
的数据结构和算法都很容易理解。所有的超时任务都首先进入延时队列。后台超时线程不断的从延迟队列中获取任务并且等待超时时间到达后执行任务。延迟队列采用大顶堆排序,在延迟任务的场景中有三种操作,分别是:添加任务,提取队列头任务,查看队列头任务。
查看队列头任务的事件复杂度是 O(1) 。而添加任务和提取队列头任务的时间复杂度都是 O(Log2n) 。当任务数量较大时,添加和删除的开销也是比较大的。此外,由于Timer
内部只有一个处理线程,如果有一个延迟任务的处理消耗了较多的时间,会对应的延迟后续任务的处理。
由于Timer
只有一个线程用来处理延迟任务,在任务数量很多的时候显然是不足够的。在 JDK1.5 引入线程池接口ExecutorService
后,也对应的提供了一个用于处理延时任务的ScheduledExecutorService
子类接口。该接口内部也一样使用了一个使用小顶堆进行排序的延迟队列存放任务。线程池中的线程会在这个队列上等待直到有任务可以提取。
ScheduledExecutorService
的实现上有一些特殊,只有一个线程能够提取到延迟队列头的任务,并且根据任务的超时时间进行等待。在这个等待期间,其他的线程是无法获取任务的。这样的实现是为了避免多个线程同时获取任务,导致超时时间未到达就任务触发或者在等待任务超时时间时有新的任务被加入而无法响应。
由于ScheduledExecutorService
可以使用多个线程,这样也缓解了因为个别任务执行时间长导致的后续任务被阻塞的情况。不过延迟队列也是一样采用小顶堆的排序方式,因此添加任务和删除任务的时间复杂度都是 O(Log2n) 。在任务数量很大的情况下,性能表现比较差。
虽然Timer
和ScheduledThreadPoolExecutor
都提供了对延迟任务的支撑能力,但是由于新增任务和提取任务的时间复杂度都是 O(Log2n) ,在任务数量很大,比如几万,十几万的时候,性能的开销就变得很巨大。
那么,是否存在新增任务和提取任务比 O(Log2n) 复杂度更低的数据结构呢?答案是存在的。在论文《Hashed and Hierarchical Timing Wheels》中设计了一种名为时间轮( Timing Wheels )的数据结构,这种结构在处理延迟任务时,其新增任务和删除任务的时间复杂度降低到了 O(1) 。
基本原理时间轮的数据结构很类似于我们钟表上的数据指针,故而得名时间轮。其数据结构用图示意如下
每一个时间“格子”我们称之为槽位,槽位中存放着延迟任务队列。槽位本身代表着一个时间单位,比如 1 秒。时间轮拥有的槽位个数就是该时间轮能够处理的最大延迟跨度的任务,槽位的时间单位代表着时间轮的精度。这意味着小于时间单位的时间在该时间轮是无法被区分的。
槽位上的延迟任务队列中的任务都有相同的延迟时间。每一个单位时间,指针都会移动到下一个槽位。当指针指向某一个槽位时,该槽位的延迟任务队列中的任务都会被触发。
当有一个延迟任务要插入时间轮时,首先计算其延迟时间与单位时间的余值,从指针指向的当前槽位移动余值的个数槽位,就是该延迟任务需要被放入的槽位。
举个例子,时间轮有8个槽位,编号为 0 ~ 7 。指针当前指向槽位 2 。新增一个延迟时间为 4 秒的延迟任务,4 % 8 = 4,因此该任务会被插入 4 + 2 = 6,也就是槽位6的延迟任务队列。
时间轮的槽位实现可以采用循环数组的方式达成,也就是让指针在越过数组的边界后重新回到起始下标。概括来说,可以将时间轮的算法描述为
用队列来存储延迟任务,同一个队列中的任务,其延迟时间相同。用循环数组的方式来存储元素,数组中的每一个元素都指向一个延迟任务队列。
有一个当前指针指向数组中的某一个槽位,每间隔一个单位时间,指针就移动到下一个槽位。被指针指向的槽位的延迟队列,其中的延迟任务全部被触发。
在时间轮中新增一个延迟任务,将其延迟时间除以单位时间得到的余值,从当前指针开始,移动余值对应个数的槽位,就是延迟任务被放入的槽位。
基于这样的数据结构,插入一个延迟任务的时间复杂度就下降到 O(1) 。而当指针指向到一个槽位时,该槽位连接的延迟任务队列中的延迟任务全部被触发。
延迟任务的触发和执行不应该影响指针向后移动的时间精确性。因此一般情况下,用于移动指针的线程只负责任务的触发,任务的执行交由其他的线程来完成。比如,可以将槽位上的延迟任务队列放入到额外的线程池中执行,然后在槽位上新建一个空白的新的延迟任务队列用于后续任务的添加。
支撑更多超过范围的延迟时间在基本原理中我们分析了时间轮的基础结构。不过当时我们假设需要插入的延迟任务的时间不会超过时间轮的长度,也就是说每一个槽位上的延迟任务队列中的任务的延迟时间都是相同的。
在这种情况下,要支持更大时间跨度的延迟任务,要么增加时间轮的槽位数,要么减少时间轮的精度,也就是每一个槽位代表的单位时间。时间轮的精度显然是一个业务上的硬性要求,那么只能增加槽位数。假设要求精度为 1 秒,要能支持延迟时间为 1 天的延迟任务,时间轮的槽位数需要 60 × 60 × 24 = 86400 。这就需要消耗更多的内存。显然,单纯增加槽位数并不是一个好的解决方案。
在论文中,针对大跨度的延迟任务支持,提供了两种扩展方案。
方案一:不同轮次的延迟任务共存相同的延迟队列在该方案中,算法引入了“轮次”的概念,延迟任务的延迟时间除以时间轮长度得到的商值为轮次。延迟任务的延迟时间除以时间轮长度得到的余数为要插入的槽位偏移量。
当插入延迟任务时首先计算轮次和槽位偏移量,通过槽位偏移量确定延迟任务插入的槽位。当指针指向某一个槽位时,对槽位指向的延迟任务队列进行遍历,其中轮次为0的延迟任务全部触发,其余任务则等待下一个周期。
通过引入轮次,就可以在有限的槽位上支持无穷时间范围的延迟任务。但是虽然插入任务的时间复杂度仍然是 O(1) ,但是在延迟任务触发时却需要遍历延迟任务队列来确认其轮次是否为0。任务触发时的时间复杂却上升为了 O(n) 。
对于这个情况,还有一个变化的细节可以采用,就是将延迟任务队列按照轮次进行排序,比方说使用小顶堆对延迟任务队列进行排序。这样,当指针指向一个槽位触发延迟任务时,只需要不断的从队列头取出任务进行轮次检查,一旦任务轮次不等于0就可以停止。任务触发的时间复杂度下降为 O(1) 。对应的,由于队列是排序的了,任务插入的时候除了需要定位插入的槽位,还需要定位在队列中的插入位置。插入的时间复杂度变化为 O(1) 和 O(Log2n) ,n 为该槽位上延迟任务队列的长度。
方案二:多层次时间轮看看手表的设计,有秒针,分针,时针。像秒针与分针,虽然都有 60 格 ,但是各自的格子代表的时间长度不同。参考这个思路,我们可以声明多个不同层级的时间轮,每一个时间轮的槽位的时间跨度是其次级时间轮的整体时间范围。
当低层级的时间轮的指针完整的走完一圈,其对应的高层级时间轮对应的移动一个槽位。并且高层级时间轮指针指向的槽位中的任务按照延迟时间计算,重新放入到低层级时间轮的不同槽位中。这样的方式,保证了每一个时间轮中的每一个槽位的延迟任务队列中的任务都具备相同时间精度的延迟时间。
以精度为 1 秒,时间范围为 1 天的时间轮为例子,可以设计三级时间轮:秒级时间轮有 60 个槽位,每个槽位的时间为 1 秒;分钟级时间轮有 60 个槽位,每个槽位的时间为 60 秒;小时级时间轮有24个槽位,每个槽位的时间为 60 分钟。当秒级时间轮走完 60 秒后,秒级时间轮的指针再次指向下标为0的槽位,而分钟级时间轮的指针向后移动一个槽位,并且将该槽位上的延迟任务全部取出并且重新计算后放入秒级时间轮。
总共只需要 60 + 60 + 24 = 144 个槽位即可支撑。对比上面提到的单级时间轮需要 86400 个槽位而言,节省了相当的内存。
层级时间轮有两种常见的做法:
- 固定时间范围:时间轮的个数,以及不同层级的时间轮的槽位数是通过构造方法的入参指定,这意味着时间轮整体能够支撑的时间范围是在构造方法的时候被确定。
- 非固定时间范围:定义好一个时间轮的槽位个数,以及最小的时间轮的槽位时间。当插入的延迟任务的时间超过时间轮范围时则动态生成更高层级的时间轮。由于时间轮是在运行期生成,并且根据任务的延迟时间计算,当已经存在的时间轮不满足其延迟时间范围要求时,动态生成高层级时间轮,因此整体能够支撑的时间范围是没有上限的。
时间轮算法的核心思想就是通过循环数组和指针移动的方式,将新增延迟任务的时间复杂度下降到 O(1) ,但是在具体实现上,包括如何处理更大时间跨度的延迟任务上,各家不同的实现都会有一些细节上的变化。下面我们以 Netty 中都时间轮实现为例子来进行代码分析。
接口定义Netty 的实现自定义了一个超时器的接口io.netty.util.Timer
,其方法如下
public interface Timer{ //新增一个延时任务,入参为定时任务TimerTask,和对应的延迟时间 Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); //停止时间轮的运行,并且返回所有未被触发的延时任务 Set < Timeout > stop();}public interface Timeout{ Timer timer(); TimerTask task(); boolean isExpired(); boolean isCancelled(); boolean cancel();}
Timeout
接口是对延迟任务的一个封装,其接口方法说明其实现内部需要维持该延迟任务的状态。后续我们分析其实现内部代码时可以更容易的看到。
Timer
接口有唯一实现HashedWheelTimer
。首先来看其构造方法,如下
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts){ //省略代码,省略参数非空检查内容。 wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; //省略代码,省略槽位时间范围检查,避免溢出以及小于 1 毫秒。 workerThread = threadFactory.newThread(worker); //省略代码,省略资源泄漏追踪设置以及时间轮实例个数检查}
首先是方法createWheel
,用于创建时间轮的核心数据结构,循环数组。来看下其方法内容
private static HashedWheelBucket[] createWheel(int ticksPerWheel){ //省略代码,确认 ticksPerWheel 处于正确的区间 //将 ticksPerWheel 规范化为 2 的次方幂大小。 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for(int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel;}
数组的长度为 2 的次方幂方便进行求商和取余计算。
HashedWheelBucket
内部存储着由HashedWheelTimeout
节点构成的双向链表,并且存储着链表的头节点和尾结点,方便于任务的提取和插入。
方法HashedWheelTimer#newTimeout
用于新增延迟任务,下面来看下代码
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit){ //省略代码,用于参数检查 start(); long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; if(delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout;}
可以看到,在新增任务的时候,任务并不是直接进入到循环数组中,而是首先被放入到一个队列,也就是属性timeouts
,该队列是一个 MPSC 类型的队列,采用这个模式主要出于提升并发性能考虑,因为这个队列只有线程workerThread
会进行任务提取操作。
该线程是在构造方法中通过调用workerThread = threadFactory.newThread(worker)
被创建。但是创建之后并不是马上执行线程的start
方法,其启动的时机是这个时间轮第一次新增延迟任务的时候,也就是本方法中的start
方法的内容。下面是其代码
public void start(){ switch(WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } while(startTime == 0) { try { startTimeInitialized.await(); } catch(InterruptedException ignore) { // Ignore - it will be ready very soon. } }}
方法很明显的分为两个部分,第一部分为Switch
方法块,通过对状态变量的 CAS 操作,确保只有一个线程能够执行workerThread.start()
方法来启动工作线程,避免并发异常。第二部分为阻塞等待,通过CountDownLatch
类型变量startTimeInitialized
执行阻塞等待,用于等待工作线程workerThread
真正进入工作状态。
从newTimeout
方法的角度来看,插入延迟任务首先是放入队列中,之前分析数据结构的时候也说过任务的触发是指针指向时间轮中某个槽位时进行,那么必然存在一个需要将队列中的延迟任务放入到时间轮的数组之中的工作。这个动作显然就是就是由workerThread
工作线程来完成。下面就来看下这个线程的具体代码内容。
工作线程是依托于HashedWheelTimer.Worker
这个实现了Runnable
接口的类进行工作的,那下面看下其对run
方法的实现代码,如下
public void run(){ {//代码块① startTime = System.nanoTime(); if(startTime == 0) { //使用startTime==0 作为线程进入工作状态模式标识,因此这里重新赋值为1 startTime = 1; } //通知外部初始化工作线程的线程,工作线程已经启动完毕 startTimeInitialized.countDown(); } {//代码块② do { final long deadline = waitForNextTick(); if(deadline > 0) { int idx = (int)(tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); } {//代码块③ for(HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for(;;) { HashedWheelTimeout timeout = timeouts.poll(); if(timeout == null) { break; } if(!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }}
线程启动与准备工作
为了方便阅读,这边将run
方法的内容分为三个代码块。首先来看代码块①。通过系统调用System.nanoTime
为启动时间startTime
设置初始值,该变量代表了时间轮的基线时间,用于后续相对时间的计算。赋值完毕后,通过startTimeInitialized
变量对外部的等待线程进行通知。
接着来看代码块②。这是主要的工作部分,整体是在一个while
循环中,确保工作线程只在时间轮没有被终止的时候工作。首先来看方法waitForNextTick
,在时间轮中,指针移动一次,称之为一个tick
,这个方法显然内部应该是用于等待指针移动到下一个tick
,来看具体代码,如下
private long waitForNextTick(){ long deadline = tickDuration * (tick + 1); for(;;) { final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if(sleepTimeMs
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?