您当前的位置: 首页 > 

庄小焱

暂无认证

  • 2浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty——Selector的原理与实战

庄小焱 发布时间:2021-10-28 20:22:53 ,浏览量:2

摘要

Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器),Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。 避免了多线程之间的上下文切换导致的开销。

  • 1) Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  • 2) 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  • 3) 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  • 4) 由于读写操作都是非阻塞的,这就可以充分提升 IO线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  • 5) 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升
Selector的原理图

  • 当客户端连接时,会通过ServerSocketChannel 得到 SocketChannel
  • Selector 进行监听 select 方法, 返回有事件发生的通道的个数.
  • 将socketChannel注册到Selector上,register(Selector sel, int ops), 一个selector上可以注册多个SocketChannel
  • 注册后返回一个 SelectionKey, 会和该Selector 关联(集合)
  • 进一步得到各个 SelectionKey (有事件发生)
  • 在通过 SelectionKey 反向获取SocketChannel , 方法 channel()
  • 可以通过 得到的 channel , 完成业务处理
Selector类相关方法
public abstract class Selector implements Closeable {
    
    //得到一个选择器对象
    public static Selector open();

    //监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
    public int select(long timeout);

    //从内部集合中得到所有的 SelectionKey
    public Set selectedKeys();

    //阻塞
    selector.select()

    //阻塞1000毫秒,在1000毫秒后返回
    selector.select(1000);

    //唤醒selector
    selector.wakeup();
    
    //不阻塞,立马返还
    selector.selectNow();

}
NIO简单通信实战 Server端
package com.zhuangxiaoyan.nio.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * @Classname SelectorTest1
 * @Description TODO
 * @Date 2021/10/30 22:31
 * @Created by xjl
 */
public class NIOServer {
    public static void main(String[] args) throws IOException {
        //创建serversocketchanel ->serversocket
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //得到一个selector对象
        Selector selector = Selector.open();

        //绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));

        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        //把serverSocketChannel注册到selector关心的时间是 Accpect
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //等待客户端连接
        while (true) {
            //如没有时间那就返回
            if (selector.select(1000) == 0) {
                //没有事件发生
                System.out.println("服务器等待一秒 无连接");
                continue;
            }
            //如果返回的是》0 获取到相关的集合 已经获取到关注的事件。通过selectionkey 反向获取通道
            Set selectionKeys = selector.selectedKeys();

            Iterator Keyiterator = selectionKeys.iterator();

            while (Keyiterator.hasNext()) {
                //获取到selectorkey
                SelectionKey key = Keyiterator.next();
                if (key.isAcceptable()) {
                    //表示有客户端连接
                    // 生成一个socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端连接成功");
                    System.out.println("客户端生成了一个连接,hash="+serverSocketChannel.hashCode());
                    //将socketchannel 设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将这个socketchanel 注册到selector 同时给SocketChannel 关联一个buffer
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }

                if (key.isReadable()) {
                    //发生一个读取的时间 通过key 来反向后去channel
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    //获取到channel关联的buffer
                    ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
                    socketChannel.read(byteBuffer);
                    System.out.println("客户端的数据"+new String(byteBuffer.array()));
                }

                //手动删除集合中移动的selectionKey 防止重复
                Keyiterator.remove();
            }
        }
    }
}
Client端
package com.zhuangxiaoyan.nio.selector;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Classname NIOClient
 * @Description TODO
 * @Date 2021/11/1 7:25
 * @Created by xjl
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        //创建一个网络通道
        SocketChannel socketChannel=SocketChannel.open();
        //设置非阻塞模式
        socketChannel.configureBlocking(false);
        //提供服务端的ip
        InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",6666);

        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("因为连接需要,客户端不会阻塞,可以做其他工作");
            }
        }
        //如果成功
        String str="hell 庄小焱";
        ByteBuffer buffer=ByteBuffer.wrap(str.getBytes());
        //发送数据
        socketChannel.write(buffer);
        System.in.read();
    }
}

关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.2123s