通过前面对EventLoop的学习,我们知道,一个EventLoop实例代表了一个Selector注册器,所有的事件都注册到这个Selector上。而所有的事件监听和执行呢,也都是在这个EventLoop上,前面的run()方法分析中已经说明过。
而NioEventLoop的父类SingleThreadEventLoop和SingleThreadEventExecutor则代表了一个任务执行器,其创建了唯一的线程用来执行被触发的事件。本文就来介绍下这个任务执行器及其线程状态的执行变化。
1.SingleThreadEventExecutor的基本属性public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 线程状态
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
// 执行任务队列
private final Queue taskQueue;
// 本文分析的重点,执行任务的那个线程
private volatile Thread thread;
...
}
2.线程的创建
这是最初的发问,我们迫切需要知道,这个唯一的执行任务的线程是什么时候被创建的呢?
在ServerBootStrap启动时,需要执行bind()方法绑定到具体端口,此时需要将NioServerSocketChannel注册到NioEventLoop上,在这里线程就被创建了,具体如下:
public abstract class AbstractBootstrap implements Cloneable {
final ChannelFuture initAndRegister() {
// 在这里,将NioServerSocketChannel注册到NioEventLoop上去,见2.1
ChannelFuture regFuture = config().group().register(channel);
...
}
}
2.1 MultithreadEventLoopGroup.register()
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
public ChannelFuture register(Channel channel) {
// NioEventLoopGroup中通过next()方法选择一个NioEventLoop来执行register()
return next().register(channel);
}
}
2.2 SingleThreadEventLoop.register()
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// AbstractUnsafe.register()来执行
promise.channel().unsafe().register(this, promise);
return promise;
}
}
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 判断当前线程是否NioEventLoop中的那个执行线程
// 目前执行线程还未被创建,所有是false,执行else逻辑
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 交由execute方法来注册
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
}
2.3 SingleThreadEventExecutor.execute()
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
// 同样的,当前执行线程还未创建,返回false
if (!inEventLoop) {
// 通过startThread方法来创建执行线程
startThread();
...
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
}
2.3.1 SingleThreadEventExecutor.startThread()创建执行线程
private void startThread() {
// 需要判断线程状态,如果已开始创建,则不再重复创建
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 真正创建线程在这里
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将当前线程赋值给执行线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 启动run()方法,在子类中实现,也就是NioEventLoop.run()方法,用于监听事件
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
}
}
}
总结:线程的创建实际上并不复杂,只是创建该线程的时机不太好找。
笔者的经验是先在SingleThreadEventExecutor中找到thread对象的创建代码,就找到了doStartThread()方法,通过对该方法的debug,然后启动服务端进程后,才发现在服务端进程启动后,将NioServerSocketChannel注册到NioEventLoop中才将该thread创建的。
多debug,终会发现创建时机的。
在这里,线程状态由ST_NOT_STARTED 变化为 ST_STARTED
3.线程的关闭线程的关闭有两个状态,分别是
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
我们来观察下ST_SHUTTING_DOWN状态被调用的地方
在shutdown()方法被设置的ST_SHUTTING_DOWN状态,但是改方法被设置为过期@Deprecated,也没有说明替代方法是什么,我们只能自己找了。
@Override
@Deprecated
public void shutdown() {
if (isShutdown()) {
return;
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return;
}
int newState;
wakeup = true;
oldState = state;
if (inEventLoop) {
// 在这里被设置的状态
newState = ST_SHUTDOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
case ST_SHUTTING_DOWN:
newState = ST_SHUTDOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
先不纠结,直接看另一个状态ST_SHUTTING_DOWN,我们找一下这个状态被调用的代码块
在shutdownGracefully()方法中,我们找到了,与shutdown方法相比,该方法更gracefully(优雅),所以在关闭NioEventLoop时,我们建议使用该方法。
@Override
public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
ObjectUtil.checkNotNull(unit, "unit");
if (isShuttingDown()) {
return terminationFuture();
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
oldState = state;
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}
if (wakeup) {
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
return terminationFuture();
}
上面的方法主要还是将线程状态转变为ST_SHUTTING_DOWN,那么是不是状态变化之后,就立即关闭所有的任务呢?
虽然没有看过代码,但是我们也能想象的到,既然是优雅关闭,那肯定不能直接关闭所有在执行的任务,那么具体如何做的呢?我们一步步看
3.1 线程优雅关闭的任务处理我们再重新看一下线程的开启的相关代码处理,就还是doStartThread()方法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 看这里,我们在上面创建完thread对象之后,即调用了run方法来执行任务,默认由子类来实现
// 也就是NioEventLoop.run(),具体见3.2
// NioEventLoop.run()是一个死循环,只有当线程关闭时才会结束循环,后续才会执行到finally方法中
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 只有NioEventLoop.run()方法被关闭后才会执行到这里
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
// 通过这个英文注释我们知道,confirmShutdown()方法执行了所有遗留的task和shutdown 钩子函数
// 具体见3.3
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
}
3.2 NioEventLoop.run() 的线程关闭机制
之前我们都是关注NioEventLoop.run()方法中的任务执行,现在我们重点关注下线程关闭时的任务处理
public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
int selectCnt = 0;
for (;;) {
try {
...
...
try {
// 判断当前线程状态是否ST_SHUTTING_DOWN
// 如果已被设置为停止状态,则执行closeAll()
if (isShuttingDown()) {
// 具体处理在这里
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
3.2.1 NioEventLoop.closeAll() 关闭所有Channel及Task
public final class NioEventLoop extends SingleThreadEventLoop {
private void closeAll() {
selectAgain();
Set keys = selector.keys();
Collection channels = new ArrayList(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
// 如果是task类型,则直接取消注册任务
k.cancel();
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
// 关闭所有的channel,未发送完的数据则继续发送,不再发送新的数据
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
}
3.3 SingleThreadEventExecutor.confirmShutdown() 执行所有未执行完成的task
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
// 取消定时任务
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 所有已经在taskQueue中的任务则继续执行
// 设置的shutdown钩子函数也全部执行
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
taskQueue.offer(WAKEUP_TASK);
return false;
}
...
}
}
总结:线程的中止(优雅中止),在关闭之前还是做了很多事情的。对于已经存在于taskQueue中的任务需要执行完成,设置的shutdown钩子函数也需要执行完成。
并关闭所有注册在Selector上的Channel。
4.线程中止状态在3中线程关闭完成之后,也就是doStartThread()方法的最后
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private void doStartThread() {
finally {
try {
cleanup();
} finally {
FastThreadLocal.removeAll();
// 最终将当前线程状态转变为ST_TERMINATED中止状态
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
中止状态比较好理解,当线程发起关闭方法,且所有任务执行完成后,自然就过渡到ST_TERMINATED中止态。
最后通过一张图来展示下EventLoop线程状态的变化:
当然ST_SHUTDOWN状态没有展示,不推荐使用shutdown()方法