AIO (Asynchronous I/O) 异步非阻塞I/O 是 Java 1.7 之后的,,在java.nio包,AIO是是 NIO 的升级版本(所以AIO又叫NIO.2),提供了异步非堵塞的 IO 操作方式,异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
新增了许多支持异步的类,主要有:
- AsynchronousFileChannel类是异步的方式处理本地文件的文件通道。
- AsynchronousSocketChannel和AsynchronousServerSocketChannel两个类是javaAIO为TCP通信提供的异步Channel。
- AsynchronousChannelGroup是异步Channel的分组管理器,它可以实现资源共享。
异步文件通道传送门:Java7 Parh,Files和AIO中AsynchronousFileChannel
一、AsynchronousServerSocketChannel和AsynchronousSocketChannel类AsynchronousServerSocketChannel对象,类似于ServerSocket,它们的用法比较类似。
1、创建AsynchronousServerSocketChannel对象
-
-
abstract Future
accept()
接受一个连接。
abstract void
accept(A attachment, CompletionHandler handler)
接受一个连接。
AsynchronousServerSocketChannel
bind(SocketAddress local)
结合通道的插座到本地地址和配置套接字监听连接。
abstract AsynchronousServerSocketChannel
bind(SocketAddress local, int backlog)
结合通道的插座到本地地址和配置套接字监听连接。
abstract SocketAddress
getLocalAddress()
返回此通道的套接字绑定到的套接字地址。
static AsynchronousServerSocketChannel
open()
打开一个异步服务器套接字通道。
static AsynchronousServerSocketChannel
open(AsynchronousChannelGroup group)
打开一个异步服务器套接字通道。
-
直接调用open()方法,或者使用绑定线程池AsynchronousServerSocketChannel,创建AsynchronousChannelGroup时,需要传入一个ExecutorService,也就是绑定一个线程池。
该线程池负责两个任务:处理IO事件和触发CompletionHandler回调接口。
AsynchronousServerSocketChannel serverSocketChannel = null;
try {
ExecutorService executorService = Executors.newFixedThreadPool(80);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
}catch (IOException ioe){
ioe.printStackTrace();
}
AsynchronousServerSocketChannel对象,也是调用accept()方法来接受来自客户端的连接,
由于异步IO实际的IO操作是交给操作系统来做的,用户进程只负责通知操作系统进行IO和接受操作系统IO完成的通知。
所以异步的调用accept()方法后,当前线程不会阻塞,程序也不知道accept()方法什么时候能够接收到客户端请求并且操作系统完成网络IO,为解决这个问题,AIO为accept方法提供两个处理:
1. 通过 Future
通过Future对象的get()方法,但是get方法会阻塞该线程,所以这种方式是阻塞式的异步IO。
或者循环调用 Future 的 isDone() 方法直到返回 true。
while (!future.isDone()) {}
2. 通过 CompletionHandler
CompletionHandler接口中定义了两个方法:
- completed(V result , A attachment):当IO完成时触发该方法。
- faild(Throwable exc, A attachment):当IO失败时触发该方法
2、创建AsynchronousSocketChannel对象
AsynchronousSocketChannel对象,类似于Socket,它们的用法比较类似。
-
-
abstract AsynchronousSocketChannel
bind(SocketAddress local)
将信道的套接字绑定到本地地址。
abstract Future
connect(SocketAddress remote)
连接此通道。
abstract void
connect(SocketAddress remote, A attachment, CompletionHandler handler)
连接此通道。
abstract SocketAddress
getLocalAddress()
返回此通道的套接字绑定到的套接字地址。
abstract SocketAddress
getRemoteAddress()
返回此通道的套接字连接的远程地址。
static AsynchronousSocketChannel
open()
打开一个异步套接字通道。
static AsynchronousSocketChannel
open(AsynchronousChannelGroup group)
打开一个异步套接字通道。
-
注意:
1. 使用异步Channel时,accept()、connect()、read()、write()等方法都不会阻塞,每个方法可能会提供FutureCompletionHandler的处理方式。
2. 如果使用返回Future的这些方法,程序并不能知道什么时候成功IO,必须要使用get()或者isDone()() 方法,等方法的阻塞结束后才能确保IO完成,继续执行下面的操作。
二、 AIO异步网络编程 -
1、通过 Future(不推荐)
服务端:
public class SimpleServer {
public static void main(String[] args) {
try {
// 创建AsynchronousServerSocketChannel对象, 并绑定ip和端口
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
// 使用循环不断地接受来自客户端的连接
while(true){
Future future = serverSocketChannel.accept();
// 通过future.get方法获取到客户端对应的socketChannel
AsynchronousSocketChannel socketChannel = future.get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("服务端发送消息来了"));
buffer.flip();
// 异步模式下就不要使用循环写
socketChannel.write(buffer).get();
// 读数据
buffer.clear();
socketChannel.read(buffer).get();
buffer.flip();
System.out.println("来自客户端的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端:
public class SimpleClient {
public static void main(String[] args) {
try {
// 创建AsynchronousSocketChannel对象, 并向服务器发出连接请求
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞下
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)).get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读数据
buffer.clear();
socketChannel.read(buffer).get();
buffer.flip();
System.out.println("来自服务器的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
// 写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("客户端发送来了老弟"));
buffer.flip();
// 异步模式下就不要使用循环写
socketChannel.write(buffer).get();
} catch (Exception ioException) {
ioException.printStackTrace();
}
}
}
2、通过 CompletionHandler(推荐使用)
服务端:
public static void main(String[] args) {
try {
// 多线程版本
// ExecutorService executorService = Executors.newCachedThreadPool();
// AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
// serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
// 单线程版本
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9999));
serverSocketChannel.accept(null, new CompletionHandler() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
// 递归循环接受客户端的连接
serverSocketChannel.accept(null, this);
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 先读数据
buffer.clear();
socketChannel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("来自客户端的信息:" + Charset.forName("UTF-8").decode(buffer).toString());
try {
// 后写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("服务端收到消息了"));
buffer.flip();
// 异步模式,就不要使用循环写,把线程阻塞
socketChannel.write(buffer).get();
} catch (Exception e) {
System.out.println("write failed: " + e.getMessage());
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read failed: " + exc.getMessage());
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("accept failed: " + exc.getMessage());
}
});
} catch (IOException ioException) {
ioException.printStackTrace();
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端:
public static void main(String[] args) {
try {
// 创建AsynchronousSocketChannel对象, 并向服务器发出连接请求
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞下
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)).get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 先写数据
buffer.clear();
buffer.put(Charset.forName("UTF-8").encode("客户端发送, 来了老弟"));
buffer.flip();
// 异步模式,就不要使用循环写,把线程阻塞
socketChannel.write(buffer).get();
// 后读数据
buffer.clear();
socketChannel.read(buffer, buffer, new CompletionHandler() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("来自服务器的信息:" + Charset.forName("UTF-8").decode(attachment).toString());
// 用来递归实现重复循环读取数据
buffer.clear();
socketChannel.read(buffer, null, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("read failed: " + exc.getMessage());
}
});
} catch (Exception ioException) {
ioException.printStackTrace();
}
}
这篇博主通过线程的实例代码不错,可以参考:IO模型之AIO代码及其实践详解
—— Stay Hungry. Stay Foolish. 求知若饥,虚心若愚。