您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-EventLoop线程状态变化

恐龙弟旺仔 发布时间:2021-12-16 12:35:55 ,浏览量:0

前言:

    通过前面对EventLoop的学习,我们知道,一个EventLoop实例代表了一个Selector注册器,所有的事件都注册到这个Selector上。而所有的事件监听和执行呢,也都是在这个EventLoop上,前面的run()方法分析中已经说明过。

    而NioEventLoop的父类SingleThreadEventLoop和SingleThreadEventExecutor则代表了一个任务执行器,其创建了唯一的线程用来执行被触发的事件。本文就来介绍下这个任务执行器及其线程状态的执行变化。

1.SingleThreadEventExecutor的基本属性
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    // 线程状态
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

    // 执行任务队列
    private final Queue taskQueue;

    // 本文分析的重点,执行任务的那个线程
    private volatile Thread thread;
	...
}
2.线程的创建

    这是最初的发问,我们迫切需要知道,这个唯一的执行任务的线程是什么时候被创建的呢?

    在ServerBootStrap启动时,需要执行bind()方法绑定到具体端口,此时需要将NioServerSocketChannel注册到NioEventLoop上,在这里线程就被创建了,具体如下:

public abstract class AbstractBootstrap implements Cloneable {
	final ChannelFuture initAndRegister() {
        // 在这里,将NioServerSocketChannel注册到NioEventLoop上去,见2.1
        ChannelFuture regFuture = config().group().register(channel);
        ...
    }
}
2.1 MultithreadEventLoopGroup.register()
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
	public ChannelFuture register(Channel channel) {
        // NioEventLoopGroup中通过next()方法选择一个NioEventLoop来执行register()
        return next().register(channel);
    }
}
2.2 SingleThreadEventLoop.register()
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
	public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // AbstractUnsafe.register()来执行
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

protected abstract class AbstractUnsafe implements Unsafe {

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 判断当前线程是否NioEventLoop中的那个执行线程
        // 目前执行线程还未被创建,所有是false,执行else逻辑
    	if (eventLoop.inEventLoop()) {
                register0(promise);
        } else {
            try {
                // 交由execute方法来注册
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
               ...
            }
        }
    }
}
2.3 SingleThreadEventExecutor.execute()
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
	public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        // 同样的,当前执行线程还未创建,返回false
        if (!inEventLoop) {
            // 通过startThread方法来创建执行线程
            startThread();
            ...
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }
}

2.3.1 SingleThreadEventExecutor.startThread()创建执行线程

private void startThread() {
    // 需要判断线程状态,如果已开始创建,则不再重复创建
    if (state == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 真正创建线程在这里
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 将当前线程赋值给执行线程
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 启动run()方法,在子类中实现,也就是NioEventLoop.run()方法,用于监听事件
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            }
        }
    }
}

总结:线程的创建实际上并不复杂,只是创建该线程的时机不太好找。

笔者的经验是先在SingleThreadEventExecutor中找到thread对象的创建代码,就找到了doStartThread()方法,通过对该方法的debug,然后启动服务端进程后,才发现在服务端进程启动后,将NioServerSocketChannel注册到NioEventLoop中才将该thread创建的。

多debug,终会发现创建时机的。

在这里,线程状态由ST_NOT_STARTED 变化为 ST_STARTED

3.线程的关闭

线程的关闭有两个状态,分别是

    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;

我们来观察下ST_SHUTTING_DOWN状态被调用的地方

在shutdown()方法被设置的ST_SHUTTING_DOWN状态,但是改方法被设置为过期@Deprecated,也没有说明替代方法是什么,我们只能自己找了。

@Override
@Deprecated
public void shutdown() {
    if (isShutdown()) {
        return;
    }

    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        if (isShuttingDown()) {
            return;
        }
        int newState;
        wakeup = true;
        oldState = state;
        if (inEventLoop) {
            // 在这里被设置的状态
            newState = ST_SHUTDOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                case ST_SHUTTING_DOWN:
                    newState = ST_SHUTDOWN;
                    break;
                default:
                    newState = oldState;
                    wakeup = false;
            }
        }

先不纠结,直接看另一个状态ST_SHUTTING_DOWN,我们找一下这个状态被调用的代码块

在shutdownGracefully()方法中,我们找到了,与shutdown方法相比,该方法更gracefully(优雅),所以在关闭NioEventLoop时,我们建议使用该方法。

@Override
public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
    if (timeout < quietPeriod) {
        throw new IllegalArgumentException(
            "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
    }
    ObjectUtil.checkNotNull(unit, "unit");

    if (isShuttingDown()) {
        return terminationFuture();
    }

    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        if (isShuttingDown()) {
            return terminationFuture();
        }
        int newState;
        wakeup = true;
        oldState = state;
        if (inEventLoop) {
            newState = ST_SHUTTING_DOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:
                    newState = oldState;
                    wakeup = false;
            }
        }
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            break;
        }
    }
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);

    if (ensureThreadStarted(oldState)) {
        return terminationFuture;
    }

    if (wakeup) {
        taskQueue.offer(WAKEUP_TASK);
        if (!addTaskWakesUp) {
            wakeup(inEventLoop);
        }
    }

    return terminationFuture();
}

上面的方法主要还是将线程状态转变为ST_SHUTTING_DOWN,那么是不是状态变化之后,就立即关闭所有的任务呢?

虽然没有看过代码,但是我们也能想象的到,既然是优雅关闭,那肯定不能直接关闭所有在执行的任务,那么具体如何做的呢?我们一步步看

3.1 线程优雅关闭的任务处理

我们再重新看一下线程的开启的相关代码处理,就还是doStartThread()方法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 看这里,我们在上面创建完thread对象之后,即调用了run方法来执行任务,默认由子类来实现
                    // 也就是NioEventLoop.run(),具体见3.2
                    // NioEventLoop.run()是一个死循环,只有当线程关闭时才会结束循环,后续才会执行到finally方法中
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // 只有NioEventLoop.run()方法被关闭后才会执行到这里
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        // 通过这个英文注释我们知道,confirmShutdown()方法执行了所有遗留的task和shutdown 钩子函数
                        // 具体见3.3
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
}
3.2 NioEventLoop.run() 的线程关闭机制

    之前我们都是关注NioEventLoop.run()方法中的任务执行,现在我们重点关注下线程关闭时的任务处理

public final class NioEventLoop extends SingleThreadEventLoop {
	protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                ...

                ...
            
            try {
                // 判断当前线程状态是否ST_SHUTTING_DOWN
                // 如果已被设置为停止状态,则执行closeAll()
                if (isShuttingDown()) {
                    // 具体处理在这里
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

3.2.1 NioEventLoop.closeAll() 关闭所有Channel及Task

public final class NioEventLoop extends SingleThreadEventLoop {
	private void closeAll() {
        selectAgain();
        Set keys = selector.keys();
        Collection channels = new ArrayList(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                // 如果是task类型,则直接取消注册任务
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask task = (NioTask) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            // 关闭所有的channel,未发送完的数据则继续发送,不再发送新的数据 
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
}
3.3 SingleThreadEventExecutor.confirmShutdown() 执行所有未执行完成的task
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
	protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }

        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }

        // 取消定时任务
        cancelScheduledTasks();

        if (gracefulShutdownStartTime == 0) {
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }

        // 所有已经在taskQueue中的任务则继续执行
        // 设置的shutdown钩子函数也全部执行
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                // Executor shut down - no new tasks anymore.
                return true;
            }
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            taskQueue.offer(WAKEUP_TASK);
            return false;
        }

        ...
    }
}

总结:线程的中止(优雅中止),在关闭之前还是做了很多事情的。对于已经存在于taskQueue中的任务需要执行完成,设置的shutdown钩子函数也需要执行完成。

并关闭所有注册在Selector上的Channel。

4.线程中止状态

    在3中线程关闭完成之后,也就是doStartThread()方法的最后

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
 
    private void doStartThread() {
		finally {
            try {
                cleanup();
            } finally {
                FastThreadLocal.removeAll();

                // 最终将当前线程状态转变为ST_TERMINATED中止状态
                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                threadLock.countDown();
                int numUserTasks = drainTasks();
                if (numUserTasks > 0 && logger.isWarnEnabled()) {
                    logger.warn("An event executor terminated with " +
                                "non-empty task queue (" + numUserTasks + ')');
                }
                terminationFuture.setSuccess(null);
            }
    }
}

中止状态比较好理解,当线程发起关闭方法,且所有任务执行完成后,自然就过渡到ST_TERMINATED中止态。

最后通过一张图来展示下EventLoop线程状态的变化:

当然ST_SHUTDOWN状态没有展示,不推荐使用shutdown()方法

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0384s