Netty作为一个优秀的NIO框架,不仅仅是对JDK NIO的使用进行封装,使其更易用,而且对其一些原生BUG也进行了修复。其中epoll cpu 100%的一个bug的修复更是经典。
1.bug产生的场景常规使用NIO ServerSocket来接收连接的方式如下:
Selector selector = Selector.open();
while (true) {
int count = selector.select();
Set selectionKeys = selector.selectedKeys();
// 处理对应事件
}
在这个无限循环中,正常来说selector.select()应该是阻塞的,或者返回连接上来的客户端数量。
但是在这个epoll的bug中,selector.select()会被唤醒,但是后续的selector.selectedKeys()又没有获取到数据,所以不断的处理while循环中,不停的执行,就导致CPU达到100%。
2.bug分析实际有关于jdk epoll的这个bug已经在JDK的issue中有详细说明了,具体可见 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6670302 ,有关于如何复现这个bug也是有很详细的说明
0. server waits for connection
1. client connects and write message
2. server accepts and register OP_READ
3. server reads message and remove OP_READ from interest op set
4. client close the connection
5. server write message (without any reading.. surely OP_READ is not set)
6. server's select wakes up infinitely with return value 0
简单来说,当出现客户端的突然中断时,epoll会对该socket返回的eventSet事件集合置为POLLHUP或POLLERR,当eventSet发生变化,则Selector会被唤醒,selector.select()方法也就会从沉睡中醒来,但是后续执行SelectorKey时发生没有对应数据,处于一直循环的状态。
3.bug修复Netty对该bug进行修复。修复的方案也很简单,就是用一个计数器检测selector空转的次数,当超过设定的阈值时,则重新创建一个Selector,并将原来注册在老的Selector上的Channel,重新注册到新的Selector上。具体如下
// NioEventLoop.run
protected void run() {
// 轮询计数器
int selectCnt = 0;
for (;;) {
try {
...
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
// 如果ranTasks(taskQueue中任务不为空) 或者strategy(Selector.select方法触发的事件不为空)
// 说明已经执行了相关任务,非空转
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// 执行完任务之后,则将这个selectCnt计数器重置为0
selectCnt = 0;
// 否则的话就有可能是触发了Bug,需要判断下selectCnt的数量
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}...
}
// unexpectedSelectorWakeup
private boolean unexpectedSelectorWakeup(int selectCnt) {
...
// 默认阈值SELECTOR_AUTO_REBUILD_THRESHOLD=512
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 若超过阈值,则有可能是触发了bug,则进行selector重建,重建方法见3.1
rebuildSelector();
return true;
}
return false;
}
}
3.1 NioEventLoop.rebuildSelector0() 重建Selector
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
// 新建一个Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// 将原来注册在老的selector上的channel重新注册到新的selector上
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 老的selector手动关闭
oldSelector.close();
} catch (Throwable t) {
...
}
}
重建selector方法并不复杂,就是新建一个selector之后,将原来注册在老的selector上的所有channel都注册到新的selector上,并关闭老的selector即可。