您当前的位置: 首页 >  面试

终结全网!手写Netty面试题答案

发布时间:2021-05-27 16:57:32 ,浏览量:0

1 最原始架构

一个线程负责处理连接、读写等各种请求。

创建一个线程,注册到 Selector,将 serversocketchannel 注册到Selector selectionKey 里就有具体的事件

对应代码

package io.netty.example.helloworld; import io.netty.channel.EventLoopGroup; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Set; /**
 * @author JavaEdge
 * @date 2021/5/17
 */ public class NioServer { public static void main(String[] args) throws Exception { // 创建一个 ServerSocketChannel ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(8080)); // 设置为非阻塞模式 serverChannel.configureBlocking(false); // 创建一个事件查询器 Selector selector = SelectorProvider.provider().openSelector(); // 把 ServerSocketChannel 注册到 selector,并且感兴趣 OP_ACCEPT 事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 阻塞方法,等待系统有I/O事件发生 int eventNum = selector.select(); System.out.println("系统发生IO事件 数量->" + eventNum); Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iterable = keySet.iterator(); while (iterable.hasNext()) { // 拿到该 key SelectionKey key = iterable.next(); // 拿到后就移除它,否则后面遍历还会重复拿到它 iterable.remove(); // 连接事件 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受客户端的连接,一个 SocketChannel 代表一个TCP连接 SocketChannel socketChannel = ssc.accept(); // 把SocketChannel设置为非阻塞模式 socketChannel.configureBlocking(false); System.out.println("服务器接受了一个新的连接 " + socketChannel.getRemoteAddress()); } } } } } 
package io.netty.example.helloworld; import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Arrays; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /**
 * @author JavaEdge
 * @date 2021/5/17
 */ public class NioServer { public static void main(String[] args) throws Exception { //创建一个ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(8089)); //设置为非阻塞模式 serverChannel.configureBlocking(false); // 创建一个事件查询器 Selector selector = SelectorProvider.provider().openSelector(); // 把 ServerSocketChannel 注册到事件查询器上,并且感兴趣 OP_ACCEPT  事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); // //        //创建一组事件查询器 //        EventLoopGroup eventLoopGroup = new EventLoopGroup(); while (true) { // 阻塞方法,等待系统有I/O事件发生 int eventNum = selector.select(); System.out.println("系统发生IO事件 数量->" + eventNum); Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iterable = keySet.iterator(); while (iterable.hasNext()) { // 拿到该 key SelectionKey key = iterable.next(); // 拿到后就移除它,否则后面遍历还会重复拿到它 iterable.remove(); // 连接事件 if (key.isAcceptable()) { // 因为只有 ServerSocketChannel 有接收事件,所以可直接强转 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受客户端的连接,一个 SocketChannel 代表一个TCP连接 // 事件如果发生了,就肯定有新的连接 SocketChannel socketChannel = ssc.accept(); // 把SocketChannel设置为非阻塞模式 socketChannel.configureBlocking(false); System.out.println("服务器接受了一个新的连接 " + socketChannel.getRemoteAddress()); // 把SocketChannel注册到Selector,并关注OP_READ事件 socketChannel.register(selector, SelectionKey.OP_READ); //                    eventLoopGroup.register(socketChannel, SelectionKey.OP_READ); } // 可读事件 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { int readNum = socketChannel.read(buffer); if (readNum == -1) { System.out.println("读取结束,关闭 socket"); key.channel(); socketChannel.close(); break; } // 将Buffer从写模式切到读模式 buffer.flip(); byte[] bytes = new byte[readNum]; buffer.get(bytes, 0, readNum); System.out.println(new String(bytes)); /*                        byte[] response = "client hello".getBytes();
                        // 清理了才可以重新使用
                        buffer.clear();
                        buffer.put(response);
                        buffer.flip();
                        // 该方法非阻塞的,如果此时无法写入也不会阻塞在此,而是直接返回 0 了
                        socketChannel.write(buffer);

                        */ // 在 key 上附加一个对象 key.attach("hello client".getBytes()); // 把 key 关注的事件切换为写 key.interestOps(SelectionKey.OP_WRITE); } catch (IOException e) { System.out.println("读取时发生异常,关闭 socket"); // 取消 key key.channel(); } } if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // 可写时再将那个对象拿出来 byte[] bytes = (byte[]) key.attachment(); key.attach(null); System.out.println("可写事件发生 写入消息" + Arrays.toString(bytes)); if (bytes != null) { socketChannel.write(ByteBuffer.wrap(bytes)); } // 写完后,就不需要写了,就切换为读事件   如果不写该行代码就会死循环 //                    key.interestOps(SelectionKey.OP_READ); } } } } } 

2 接收请求单独处理
  • 架构图
2.1 死锁案例
package io.netty.example.helloworld; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Set; /**
 * @author JavaEdge
 * @date 2021/5/17
 */ public class NioServer { public static void main(String[] args) throws Exception { // 创建一个ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(8089)); // 设置为非阻塞模式 serverChannel.configureBlocking(false); // 创建一个事件查询器 Selector selector = SelectorProvider.provider().openSelector(); // 把 ServerSocketChannel 注册到事件查询器上,并且感兴趣 OP_ACCEPT  事件 serverChannel.register(selector, SelectionKey.OP_ACCEPT); EventLoop eventLoop = new EventLoop(); while (true) { // 阻塞方法,等待系统有I/O事件发生 int eventNum = selector.select(); System.out.println("系统发生IO事件 数量->" + eventNum); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { // 拿到该 key SelectionKey key = keyIterator.next(); // 拿到后就移除它,否则后面遍历还会重复拿到它 keyIterator.remove(); // 只需处理【连接事件】 a connection was accepted by a ServerSocketChannel. if (key.isAcceptable()) { // 因为只有 ServerSocketChannel 有接收事件,所以可直接强转 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受客户端的连接,一个 SocketChannel 代表一个TCP连接 // 事件如果发生了,就肯定有新的连接 SocketChannel socketChannel = ssc.accept(); // 把SocketChannel设置为非阻塞模式 socketChannel.configureBlocking(false); System.out.println("服务器接受了一个新的连接 " + socketChannel.getRemoteAddress()); // 把SocketChannel注册到Selector,并关注OP_READ事件 // socketChannel.register(selector, SelectionKey.OP_READ); eventLoop.register(socketChannel, SelectionKey.OP_READ); } } } } } 
package io.netty.example.helloworld; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Arrays; import java.util.Iterator; import java.util.Set; /**
 * @author JavaEdge
 * @date 2021/5/25
 */ public class EventLoop implements Runnable { private Selector selector; private Thread thread; public EventLoop() throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.thread = new Thread(this); this.thread.start(); } /**
     * 把 channel 注册到 事件查询器
     */ public void register(SocketChannel channel, int keyOps) throws ClosedChannelException { channel.register(selector, keyOps); } @Override public void run() { while (!Thread.interrupted()) { try { // 阻塞方法,等待系统有 I/0 事件产生 int eventNum = selector.select(); System.out.println("系统发生IO事件 数量->" + eventNum); Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iterable = keySet.iterator(); while (iterable.hasNext()) { SelectionKey key = iterable.next(); iterable.remove(); // 可读事件 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { int readNum = socketChannel.read(buffer); if (readNum == -1) { System.out.println("读取结束,关闭 socket"); key.channel(); socketChannel.close(); break; } // 将Buffer从写模式切到读模式 buffer.flip(); byte[] bytes = new byte[readNum]; buffer.get(bytes, 0, readNum); System.out.println(new String(bytes)); /*                        byte[] response = "client hello".getBytes();
                        // 清理了才可以重新使用
                        buffer.clear();
                        buffer.put(response);
                        buffer.flip();
                        // 该方法非阻塞的,如果此时无法写入也不会阻塞在此,而是直接返回 0 了
                        socketChannel.write(buffer);

                        */ // 在 key 上附加一个对象 key.attach("EventLoop says hello to client".getBytes()); // 把 key 关注的事件切换为写 key.interestOps(SelectionKey.OP_WRITE); } catch (IOException e) { System.out.println("读取时发生异常,关闭 socket"); // 取消 key key.channel(); } } if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // 可写时再将那个对象拿出来 byte[] bytes = (byte[]) key.attachment(); key.attach(null); System.out.println("可写事件发生 写入消息" + Arrays.toString(bytes)); if (bytes != null) { socketChannel.write(ByteBuffer.wrap(bytes)); } // 写完后,就不需要写了,就切换为读事件   如果不写该行代码就会死循环 key.interestOps(SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); } } } } 

启动之后,开启一个客户端连接请求: 打断点到该行代码: 点击继续执行时,dump此时的线程状态:主线程已经阻塞在此 说明主线程在等待 @574线程的锁,它是谁呢?没错 EventLoop 线程阻塞在select 方法,而且它此时已经获取了Selector 内部的一把锁,所以不是Blocked状态。 但此时主线程执行 register 也需要该Selector内部的这把锁,但又不是同一线程,所以产生死锁。 所以不能由main 线程调用注册方法。

2.2 解决死锁

改造后的 EventLoop 类:

package io.netty.example.helloworld; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Arrays; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.LinkedBlockingDeque; /**
 * @author JavaEdge
 * @date 2021/5/25
 */ public class EventLoop implements Runnable { private Selector selector; private Thread thread; private Queue<Runnable> taskQueue = new LinkedBlockingDeque<>(32); public EventLoop() throws IOException { this.selector = SelectorProvider.provider().openSelector(); this.thread = new Thread(this); this.thread.start(); } /**
     * 把 channel 注册到 事件查询器
     */ public void register(SocketChannel channel, int keyOps) { // 将注册的逻辑封装成一个任务,因为不能让主线程执行,必须由 eventloop 的线程执行 taskQueue.add(() -> { try { channel.register(selector, keyOps); } catch (ClosedChannelException e) { e.printStackTrace(); } }); // 但此时EventLoop的线程阻塞在 selector.select(),通过主线程唤醒它 selector.wakeup(); } @Override public void run() { while (!Thread.interrupted()) { try { System.out.println(thread + "开始查询 I/O 事件..."); // 阻塞方法,等待系统有 I/0 事件产生 int eventNum = selector.select(); System.out.println("系统发生IO事件 数量->" + eventNum); // 有事件则处理 if (eventNum > 0) { Set<SelectionKey> keySet = selector.selectedKeys(); Iterator<SelectionKey> iterable = keySet.iterator(); while (iterable.hasNext()) { SelectionKey key = iterable.next(); iterable.remove(); // 可读事件 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { int readNum = socketChannel.read(buffer); if (readNum == -1) { System.out.println("读取结束,关闭 socket"); key.channel(); socketChannel.close(); break; } // 将Buffer从写模式切到读模式 buffer.flip(); byte[] bytes = new byte[readNum]; buffer.get(bytes, 0, readNum); System.out.println(new String(bytes)); /*                        byte[] response = "client hello".getBytes();
                        // 清理了才可以重新使用
                        buffer.clear();
                        buffer.put(response);
                        buffer.flip();
                        // 该方法非阻塞的,如果此时无法写入也不会阻塞在此,而是直接返回 0 了
                        socketChannel.write(buffer);

                        */ // 在 key 上附加一个对象 key.attach("EventLoop says hello to client".getBytes()); // 把 key 关注的事件切换为写 key.interestOps(SelectionKey.OP_WRITE); } catch (IOException e) { System.out.println("读取时发生异常,关闭 socket"); // 取消 key key.channel(); } } if (key.isWritable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); // 可写时再将那个对象拿出来 byte[] bytes = (byte[]) key.attachment(); key.attach(null); System.out.println("可写事件发生 写入消息" + Arrays.toString(bytes)); if (bytes != null) { socketChannel.write(ByteBuffer.wrap(bytes)); } // 写完后,就不需要写了,就切换为读事件   如果不写该行代码就会死循环 key.interestOps(SelectionKey.OP_READ); } } } // 无事件则执行任务 Runnable task; while ((task = taskQueue.poll()) != null) { // EventLoop执行队列中的任务,即注册任务 task.run(); } } catch (IOException e) { e.printStackTrace(); } } } } 
3 EventLoopGroup

由于只使用一个 Selector 来处理客户端的读写请求,如果并发太大,太多 socketchannel,这个死循环就可能处理不过来,造成大量请求超时。 所以有了EventLoopGroup。 且 channel 负责读、写事件的处理。

package io.netty.example.helloworld; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; /**
 * 类似 netty 的 channel
 *
 * @author JavaEdge
 *
 * @date 2021/5/27
 */ public class MyChannel { private SocketChannel channel; private EventLoop eventLoop; /**
     * 写数据的缓冲区
     */ private Queue<ByteBuffer> writeQueue = new ArrayBlockingQueue<>(16); public MyChannel(SocketChannel channel,EventLoop eventLoop) { this.channel = channel; this.eventLoop = eventLoop; } public void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); try { int readNum = socketChannel.read(buffer); if (readNum == -1) { System.out.println("读取结束,关闭 socket"); key.channel(); socketChannel.close(); return; } // 将Buffer从写模式切到读模式 buffer.flip(); byte[] bytes = new byte[readNum]; // 客户端发来的数据 buffer.get(bytes, 0, readNum); String clientData = new String(bytes); System.out.println(clientData); // 加入写缓冲区 writeQueue.add(ByteBuffer.wrap("hello JavaEdge".getBytes())); if ("flush".equals(clientData)) { // 把 key 关注的事件切换为写 key.interestOps(SelectionKey.OP_WRITE); } } catch (IOException e) { System.out.println("读取时发生异常,关闭 socket"); // 取消 key key.channel(); socketChannel.close(); } } public void write(SelectionKey key) throws IOException { ByteBuffer byteBuffer; while ((byteBuffer = writeQueue.poll()) != null) { channel.write(byteBuffer); } // 写完后,就不需要写了,就切换为读事件   如果不写该行代码就会死循环 key.interestOps(SelectionKey.OP_READ); } } 
package io.netty.example.helloworld; import java.io.IOException; import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicInteger; /**
 * @author JavaEdge
 * @date 2021/5/25
 */ public class EventLoopGroup { private EventLoop[] eventLoops = new EventLoop[2]; private final AtomicInteger idx = new AtomicInteger(0); public EventLoop next() { // 轮询算法 return eventLoops[idx.getAndIncrement() & eventLoops.length - 1]; } public EventLoopGroup() throws IOException { for (int i = 0; i < eventLoops.length; i++) { eventLoops[i] = new EventLoop(); } } /**
     * 其实啥也不干,直接找到一个EventLoop,丢给他干
     */ public void register(SocketChannel channel, int keyOps) { next().register(channel, keyOps); } } 
canceled key

启动程序,客户端发起连接请求,然后点击断开连接

报错如下: 因为点击断开连接时,是会产生一个读事件请求,而这时会将该 channel 关闭

  • 这里加一行即可
pipeline

我们需要将业务代码抽出来,给业务开发人员使用。基本上开发人员只需要写各个编解码器即可。

package io.netty.example.helloworld; import lombok.extern.slf4j.Slf4j; /**
 * @author JavaEdge
 * @date 2021/5/28
 */ @Slf4j public class MyHandler2 implements Handler { @Override public void channelRead(HandlerContext ctx, Object msg) { // 上一个处理器解码成 String 了,所以这里直接转型处理 String String string = (String) msg; // 处理业务 log.debug(string); // 传给 handler2 ctx.getMyChannel().doWrite("hello client"); if ("flush".equals(string)) { /**
             * 这样调用,会跳过而不调用 handler2 的 flush 方法
             * 若还需要调用 handler2 的 flush 方法,应该通过 channel 调用:
             *          ctx.getMyChannel().flush();
             */ ctx.flush(); } } @Override public void write(HandlerContext ctx, Object msg) { log.debug("msg=" + msg); msg += "!!!"; // 传递给 handler1 ctx.write(msg); } @Override public void flush(HandlerContext ctx) { log.debug("flush"); // 调用 handler1 ctx.flush(); } } 
package io.netty.example.helloworld; import lombok.extern.slf4j.Slf4j; import java.nio.ByteBuffer; /**
 * @author JavaEdge
 * @date 2021/5/28
 */ @Slf4j public class PipeLine { private MyChannel myChannel; private EventLoop eventLoop; HandlerContext headCtx; HandlerContext tailCtx; public PipeLine(MyChannel myChannel, EventLoop eventLoop) { this.myChannel = myChannel; this.eventLoop = eventLoop; PileLineHandler pileLineHandler = new PileLineHandler(); this.headCtx = new HandlerContext(pileLineHandler, myChannel); this.tailCtx = new HandlerContext(pileLineHandler, myChannel); // 构建初始化的链表 this.headCtx.next = this.tailCtx; this.tailCtx.prev = this.headCtx; } class PileLineHandler implements Handler { @Override public void channelRead(HandlerContext ctx, Object msg) { log.debug(msg.toString()); log.info("tail handler" + msg); } /**
         * 因为写数据是从后往前处理,所以最终到该处理器,必须要调用 channel 执行底层的写数据到 socket
         */ @Override public void write(HandlerContext ctx, Object msg) { log.debug(msg.toString()); // 既然是写底层,那就必须是 ByteBuffer 类型 if (!(msg instanceof ByteBuffer)) { throw new RuntimeException("error class type" + msg.getClass()); } // 类型符合,则加入到 channel 的缓冲区队列 PipeLine.this.myChannel.addWriteQueue((ByteBuffer) msg); } /**
         * 上边的 write 方法也只是将数据写到 channel 的临时缓冲区队列,并没有真正写进socket 输出
         * 当客户端调用了 flush 才真正的写数据出去。
         */ @Override public void flush(HandlerContext ctx) { log.debug("flush"); // 最后是由 pipeline 和 channel 交互写的数据 PipeLine.this.myChannel.doFlush(); } } /**
     * 仅演示添加到链尾
     *
     * @param handler
     */ public void addLast(Handler handler) { HandlerContext ctx = new HandlerContext(handler, myChannel); HandlerContext prev = this.tailCtx.prev; prev.next = ctx; ctx.prev = prev; ctx.next = this.tailCtx; tailCtx.prev = ctx; } } 
关注
打赏
1688896170
查看更多评论

暂无认证

  • 0浏览

    0关注

    115984博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0946s