摘要
Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器),Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。 避免了多线程之间的上下文切换导致的开销。
- 1) Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
- 2) 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 3) 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 4) 由于读写操作都是非阻塞的,这就可以充分提升 IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 5) 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升
- 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
- Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
- 将socketChannel注册到Selector上,register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
- 进一步得到各个 SelectionKey (有事件发生)
- 在通过 SelectionKey 反向获取SocketChannel , 方法 channel()
- 可以通过 得到的 channel , 完成业务处理
public abstract class Selector implements Closeable {
//得到一个选择器对象
public static Selector open();
//监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public int select(long timeout);
//从内部集合中得到所有的 SelectionKey
public Set selectedKeys();
//阻塞
selector.select()
//阻塞1000毫秒,在1000毫秒后返回
selector.select(1000);
//唤醒selector
selector.wakeup();
//不阻塞,立马返还
selector.selectNow();
}
NIO简单通信实战
Server端
package com.zhuangxiaoyan.nio.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @Classname SelectorTest1
* @Description TODO
* @Date 2021/10/30 22:31
* @Created by xjl
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建serversocketchanel ->serversocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个selector对象
Selector selector = Selector.open();
//绑定端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把serverSocketChannel注册到selector关心的时间是 Accpect
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//等待客户端连接
while (true) {
//如没有时间那就返回
if (selector.select(1000) == 0) {
//没有事件发生
System.out.println("服务器等待一秒 无连接");
continue;
}
//如果返回的是》0 获取到相关的集合 已经获取到关注的事件。通过selectionkey 反向获取通道
Set selectionKeys = selector.selectedKeys();
Iterator Keyiterator = selectionKeys.iterator();
while (Keyiterator.hasNext()) {
//获取到selectorkey
SelectionKey key = Keyiterator.next();
if (key.isAcceptable()) {
//表示有客户端连接
// 生成一个socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功");
System.out.println("客户端生成了一个连接,hash="+serverSocketChannel.hashCode());
//将socketchannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将这个socketchanel 注册到selector 同时给SocketChannel 关联一个buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
//发生一个读取的时间 通过key 来反向后去channel
SocketChannel socketChannel = (SocketChannel) key.channel();
//获取到channel关联的buffer
ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
socketChannel.read(byteBuffer);
System.out.println("客户端的数据"+new String(byteBuffer.array()));
}
//手动删除集合中移动的selectionKey 防止重复
Keyiterator.remove();
}
}
}
}
Client端
package com.zhuangxiaoyan.nio.selector;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @Classname NIOClient
* @Description TODO
* @Date 2021/11/1 7:25
* @Created by xjl
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//创建一个网络通道
SocketChannel socketChannel=SocketChannel.open();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//提供服务端的ip
InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要,客户端不会阻塞,可以做其他工作");
}
}
//如果成功
String str="hell 庄小焱";
ByteBuffer buffer=ByteBuffer.wrap(str.getBytes());
//发送数据
socketChannel.write(buffer);
System.in.read();
}
}