AIO (Asynchronous I/O) 异步非阻塞I/O 是 Java 1.7 之后的,,在java.nio包,AIO是是 NIO 的升级版本(所以AIO又叫NIO.2),提供了异步非堵塞的 IO 操作方式,异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
新增了许多支持异步的类,主要有:
- AsynchronousFileChannel类是异步的方式处理本地文件的文件通道。
- AsynchronousSocketChannel和AsynchronousServerSocketChannel两个类是javaAIO为TCP通信提供的异步Channel。
- AsynchronousChannelGroup是异步Channel的分组管理器,它可以实现资源共享。
异步文件通道传送门:Java7 Parh,Files和AIO中AsynchronousFileChannel
一、AsynchronousServerSocketChannel和AsynchronousSocketChannel类AsynchronousServerSocketChannel对象,类似于ServerSocket,它们的用法比较类似。
1、创建AsynchronousServerSocketChannel对象
-
-
abstract Futureaccept()接受一个连接。
abstract voidaccept(A attachment, CompletionHandler handler)接受一个连接。
AsynchronousServerSocketChannelbind(SocketAddress local)结合通道的插座到本地地址和配置套接字监听连接。
abstract AsynchronousServerSocketChannelbind(SocketAddress local, int backlog)结合通道的插座到本地地址和配置套接字监听连接。
abstract SocketAddressgetLocalAddress()返回此通道的套接字绑定到的套接字地址。
static AsynchronousServerSocketChannelopen()打开一个异步服务器套接字通道。
static AsynchronousServerSocketChannelopen(AsynchronousChannelGroup group)打开一个异步服务器套接字通道。
-
直接调用open()方法,或者使用绑定线程池AsynchronousServerSocketChannel,创建AsynchronousChannelGroup时,需要传入一个ExecutorService,也就是绑定一个线程池。
该线程池负责两个任务:处理IO事件和触发CompletionHandler回调接口。
AsynchronousServerSocketChannel serverSocketChannel = null;
try {
ExecutorService executorService = Executors.newFixedThreadPool(80);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
}catch (IOException ioe){
ioe.printStackTrace();
}
AsynchronousServerSocketChannel对象,也是调用accept()方法来接受来自客户端的连接,
由于异步IO实际的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接受操作系统IO完成的通知。
所以异步的调用accept()方法后,当前线程不会阻塞,程序也不知道accept()方法什么时候能够接收到客户端请求并且操作系统完成网络IO,为解决这个问题,AIO为accept方法提供两个处理:
1. 通过 Future
通过Future对象的get()方法,但是get方法会阻塞该线程,所以这种方式是阻塞式的异步IO。
或者循环调用 Future 的 isDone() 方法直到返回 true。
while (!future.isDone()) {}
2. 通过 CompletionHandler
CompletionHandler接口中定义了两个方法:
- completed(V result , A attachment):当IO完成时触发该方法。
- faild(Throwable exc, A attachment):当IO失败时触发该方法
2、创建AsynchronousSocketChannel对象
AsynchronousSocketChannel对象,类似于Socket,它们的用法比较类似。
-
-
abstract AsynchronousSocketChannelbind(SocketAddress local)将信道的套接字绑定到本地地址。
abstract Futureconnect(SocketAddress remote)连接此通道。
abstract voidconnect(SocketAddress remote, A attachment, CompletionHandler handler)连接此通道。
abstract SocketAddressgetLocalAddress()返回此通道的套接字绑定到的套接字地址。
abstract SocketAddressgetRemoteAddress()返回此通道的套接字连接的远程地址。
static AsynchronousSocketChannelopen()打开一个异步套接字通道。
static AsynchronousSocketChannelopen(AsynchronousChannelGroup group)打开一个异步套接字通道。
-
注意:
1. 使用异步Channel时,accept()、connect()、read()、write()等方法都不会阻塞,每个方法可能会提供FutureCompletionHandler的处理方式。
2. 如果使用返回Future的这些方法,程序并不能知道什么时候成功IO,必须要使用get()或者isDone()() 方法,等方法的阻塞结束后才能确保IO完成,继续执行下面的操作。
二、 AIO异步网络编程 -
1、通过 Future(不推荐)
服务端:
public class SimpleServer {
public static void main(String[] args) {
try {
// 创建AsynchronousServerSocketChannel对象, 并绑定ip和端口
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
// 使用循环不断地接受来自客户端的连接
while(true){
Future future = serverSocketChannel.accept();
// 通过future.get方法获取到客户端对应的socketChannel
AsynchronousSocketChannel socketChannel = future.get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("服务端发送消息来了"));
buffer.flip();
// 异步模式下就不要使用循环写
socketChannel.write(buffer).get();
// 读数据
buffer.clear();
socketChannel.read(buffer).get();
buffer.flip();
System.out.println("来自客户端的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端:
public class SimpleClient {
public static void main(String[] args) {
try {
// 创建AsynchronousSocketChannel对象, 并向服务器发出连接请求
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞下
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)).get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读数据
buffer.clear();
socketChannel.read(buffer).get();
buffer.flip();
System.out.println("来自服务器的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
// 写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("客户端发送来了老弟"));
buffer.flip();
// 异步模式下就不要使用循环写
socketChannel.write(buffer).get();
} catch (Exception ioException) {
ioException.printStackTrace();
}
}
}
2、通过 CompletionHandler(推荐使用)
服务端:
public static void main(String[] args) {
try {
// 多线程版本
// ExecutorService executorService = Executors.newCachedThreadPool();
// AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
// serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
// 单线程版本
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9999));
serverSocketChannel.accept(null, new CompletionHandler() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
// 递归循环接受客户端的连接
serverSocketChannel.accept(null, this);
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 先读数据
buffer.clear();
socketChannel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("来自客户端的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
try {
// 后写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("服务端收到消息了"));
buffer.flip();
// 异步模式,就不要使用循环写,把线程阻塞
socketChannel.write(buffer).get();
} catch (Exception e) {
System.out.println("write failed: " + e.getMessage());
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read failed: " + exc.getMessage());
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("accept failed: " + exc.getMessage());
}
});
} catch (IOException ioException) {
ioException.printStackTrace();
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端:
public static void main(String[] args) {
try {
// 创建AsynchronousSocketChannel对象, 并向服务器发出连接请求
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞下
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)).get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 先写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("客户端发送, 来了老弟"));
buffer.flip();
// 异步模式,就不要使用循环写,把线程阻塞
socketChannel.write(buffer).get();
// 后读数据
buffer.clear();
socketChannel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("来自服务器的信息:" + Charset.forName("UTF-8").decode(attachment).toString());
// 用来递归实现重复循环读取数据
buffer.clear();
socketChannel.read(buffer, null, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read failed: " + exc.getMessage());
}
});
} catch (Exception ioException) {
ioException.printStackTrace();
}
}
这篇博主通过线程的实例代码不错,可以参考:IO模型之AIO代码及其实践详解
—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。
