在我们进行网络编程中,为了应对海量的连接及IO处理,多数会采用一种叫做Reactor的处理模型。那么什么是Reactor模型呢?它有什么魔力来处理那么大量的客户端连接呢?与之前提到的NIO,它们是一回事吗?
1.Reactor组件Reactor模型基于事件驱动。
什么是事件?客户端的连接、读写,这都可被称为事件。
在Reactor模型中定义了三种角色:
1.1 Acceptor用于处理客户端的连接,并将连接分发到处理链中
1.2 Reactor用于监听和分配事件,将事件分配到对应的Handler
1.3 Handler真正的处理程序,执行针对客户端的读写操作
不知道大家有没有一种感觉,这种所谓角色划分,不就是之前我们说的NIO嘛,Acceptor就是我们的Selector,之前的示例中没有明确定义Reactor角色,因为所有的处理都是在Selector对应的主线程中,Handler就是我们之前定义的处理读写的方法。
所以,我们今天所说的Reactor模型,究竟是哪里不同了呢?本质上就是这里的Reactor的角色定义,Reactor模型提供了几种不同的线程监听分配模型。
2.单Reactor单线程模型与我们之前的示例差不多,单个Reactor线程负责客户端的连接、并将读写事件分配到对应的Handler中。用一张图来表示就是(图片来自网络)
大致流程可分为如下几个步骤:
1)Reactor监听client的事件(其中acceptor就是Selector,用于监听client的连接事件)
2)监听到client连接事件后,将该连接注册到Selector上,并注册对应的读事件
3)监听到client的读写事件后,则Reactor将其dispatch(分配)到对应的Handler上(黄色的read decode等就是具体的事务处理程序)
总结:这个处理模型同我们在之前的博客中讲述的一样,无论监听连接、监听读写事件处理都是在Reactor主线程上,所以,我们称之为单Reactor单线程模型。
这种模型的优势就是:代码最简单,不需要过多的分层操作;
劣势则更明显:当出现大量的client连接、读写操作时,单线程很容易达到100%负载。尤其当读写操作出现阻塞时,会出现大量连接失败;
3.单Reactor多线程模型顾名思义,相比较2中的单Reactor单线程模式,这里使用了线程池来处理事件。(图片来自网络)
大致流程与2中的主要区别在于:
1)Reactor接收到读写事件后,将读取后的数据分发到ThreadPool来处理
2)在ThreadPool线程池中进行真正的业务处理(这是比较耗时的地方)
相比较2中,Reactor线程更专注与事件的分发,将耗时的业务处理阶段交由ThreadPool来处理,这样,可以提升整个Reactor模型的吞吐量。
这种模型的劣势有吗?
我们可以看到,Reactor线程承担了client的监听,client的数据读取和数据响应,当客户端连接达到更高级别(十万、百万级别),将这些操作全放到Reactor主线程中来操作,也会很快达到100%负载。
那么,我们还有办法来进行优化吗?
4.主从Reactor多线程模型这种模型相对于3的优化在于,我们使用一个Reactor主线程来专门用于监听client的连接,再使用多个subReactor线程来执行client的数据读取和数据响应。(图片来自网络)
相比较3中的优势:
我们将client的连接和client数据读写分开来,使用不同的线程来处理。
那么Reactor主线程就可以专心的用于监听client连接,此时便可以支持十万甚至百万级别的连接。
5.主从Reactor多线程模型示例最后,我们用一个示例来说明下主从Reactor线程池模型(代码来自https://blog.csdn.net/prestigeding/article/details/55100075 )
主要分为如下几个:
public class NioServer {
private static final int DEFAULT_PORT = 9080;
public static void main(String[] args) {
new Thread(new Acceptor()).start();
}
private static class Acceptor implements Runnable {
// main Reactor 线程池,用于处理客户端的连接请求
private static ExecutorService mainReactor = Executors.newSingleThreadExecutor();
@Override
public void run() {
// TODO Auto-generated method stub
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(DEFAULT_PORT));
//转发到 MainReactor反应堆
dispatch(ssc);
System.out.println("服务端成功启动。。。。。。");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void dispatch(ServerSocketChannel ssc) {
mainReactor.submit(new MainReactor(ssc));
}
}
}
MainReactor.java
public class MainReactor implements Runnable {
private Selector selector;
private SubReactorThreadGroup subReactorThreadGroup;
public MainReactor(SelectableChannel channel) {
try {
selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
subReactorThreadGroup = new SubReactorThreadGroup(4);
}
@Override
public void run() {
System.out.println("MainReactor is running");
// TODO Auto-generated method stub
while (!Thread.interrupted()) {
Set ops = null;
try {
selector.select(1000);
ops = selector.selectedKeys();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 处理相关事件
for (Iterator it = ops.iterator(); it.hasNext();) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isAcceptable()) { // 客户端建立连接
System.out.println("收到客户端的连接请求。。。");
ServerSocketChannel serverSc = (ServerSocketChannel) key.channel();// 这里其实,可以直接使用ssl这个变量
SocketChannel clientChannel = serverSc.accept();
clientChannel.configureBlocking(false);
subReactorThreadGroup.dispatch(clientChannel); // 转发该请求
}
} catch (Throwable e) {
e.printStackTrace();
System.out.println("客户端主动断开连接。。。。。。。");
}
}
}
}
}
SubReactorThreadGroup.java
public class SubReactorThreadGroup {
private static final AtomicInteger requestCounter = new AtomicInteger(); //请求计数器
private final int nioThreadCount; // 线程池IO线程的数量
private static final int DEFAULT_NIO_THREAD_COUNT;
private SubReactorThread[] nioThreads;
private ExecutorService businessExecutePool; //业务线程池
static {
// DEFAULT_NIO_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1
// ? 2 * (Runtime.getRuntime().availableProcessors() - 1 ) : 2;
DEFAULT_NIO_THREAD_COUNT = 4;
}
public SubReactorThreadGroup() {
this(DEFAULT_NIO_THREAD_COUNT);
}
public SubReactorThreadGroup(int threadCount) {
if(threadCount < 1) {
threadCount = DEFAULT_NIO_THREAD_COUNT;
}
businessExecutePool = Executors.newFixedThreadPool(threadCount);
this.nioThreadCount = threadCount;
this.nioThreads = new SubReactorThread[threadCount];
for(int i = 0; i < threadCount; i ++ ) {
this.nioThreads[i] = new SubReactorThread(businessExecutePool);
this.nioThreads[i].start(); //构造方法中启动线程,由于nioThreads不会对外暴露,故不会引起线程逃逸
}
System.out.println("Nio 线程数量:" + threadCount);
}
public void dispatch(SocketChannel socketChannel) {
if(socketChannel != null ) {
next().register(new NioTask(socketChannel, SelectionKey.OP_READ));
}
}
protected SubReactorThread next() {
return this.nioThreads[ requestCounter.getAndIncrement() % nioThreadCount ];
}
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
SubReactorThread.java
public class SubReactorThread extends Thread {
private Selector selector;
private ExecutorService businessExecutorPool;
private List taskList = new ArrayList(512);
private ReentrantLock taskMainLock = new ReentrantLock();
/**
* 业务线程池
* @param businessExecutorPool
*/
public SubReactorThread(ExecutorService businessExecutorPool) {
try {
this.businessExecutorPool = businessExecutorPool;
this.selector = Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* socket channel
*/
public void register(NioTask task) {
if (task != null) {
try {
taskMainLock.lock();
taskList.add(task);
} finally {
taskMainLock.unlock();
}
}
}
@Override
public void run() {
while (!Thread.interrupted()) {
Set ops = null;
try {
selector.select(1000);
ops = selector.selectedKeys();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
continue;
}
// 处理相关事件
for (Iterator it = ops.iterator(); it.hasNext();) {
SelectionKey key = it.next();
it.remove();
try {
if (key.isWritable()) { // 向客户端发送请求
SocketChannel clientChannel = (SocketChannel) key
.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
clientChannel.write(buf);
System.out.println("服务端向客户端发送数据。。。");
// 重新注册读事件
clientChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) { // 接受客户端请求
System.out.println("服务端接收客户端连接请求。。。");
SocketChannel clientChannel = (SocketChannel) key
.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println(buf.capacity());
clientChannel.read(buf);//解析请求完毕
//转发请求到具体的业务线程;当然,这里其实可以向dubbo那样,支持转发策略,如果执行时间短,
//,比如没有数据库操作等,可以在io线程中执行。本实例,转发到业务线程池
dispatch(clientChannel, buf);
}
} catch (Throwable e) {
e.printStackTrace();
System.out.println("客户端主动断开连接。。。。。。。");
}
}
// 注册事件
if (!taskList.isEmpty()) {
try {
taskMainLock.lock();
for (Iterator it = taskList
.iterator(); it.hasNext();) {
NioTask task = it.next();
try {
SocketChannel sc = task.getSc();
if(task.getData() != null) {
sc.register(selector, task.getOp(), task.getData());
} else {
sc.register(selector, task.getOp());
}
} catch (Throwable e) {
e.printStackTrace();// ignore
}
it.remove();
}
} finally {
taskMainLock.unlock();
}
}
}
}
/**
* 此处的reqBuffer处于可写状态
* @param sc
* @param reqBuffer
*/
private void dispatch(SocketChannel sc, ByteBuffer reqBuffer) {
businessExecutorPool.submit( new Handler(sc, reqBuffer, this) );
}
}
Handler.java
public class Handler implements Runnable {
private static final byte[] b = "hello,服务器收到了你的信息。".getBytes(); // 服务端给客户端的响应
private SocketChannel sc;
private ByteBuffer reqBuffer;
private SubReactorThread parent;
public Handler(SocketChannel sc, ByteBuffer reqBuffer,
SubReactorThread parent) {
super();
this.sc = sc;
this.reqBuffer = reqBuffer;
this.parent = parent;
}
@Override
public void run() {
System.out.println("业务在handler中开始执行。。。");
// TODO Auto-generated method stub
//业务处理
reqBuffer.put(b);
parent.register(new NioTask(sc, SelectionKey.OP_WRITE, reqBuffer));
System.out.println("业务在handler中执行结束。。。");
}
}
NioTask.java
public class NioTask implements Serializable {
private SocketChannel sc;
private int op;
private Object data;
public NioTask(SocketChannel sc, int op) {
this.sc = sc;
this.op = op;
}
public NioTask(SocketChannel sc, int op, Object data) {
this(sc, op);
this.data = data;
}
public SocketChannel getSc() {
return sc;
}
public void setSc(SocketChannel sc) {
this.sc = sc;
}
public int getOp() {
return op;
}
public void setOp(int op) {
this.op = op;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
}
参考:
Netty学习之旅------线程模型前置篇Reactor反应堆设计模式实现(基于java.nio)_prestigeding的博客-CSDN博客