Java NIO2 Asynchronous Channel APIs

前面 我们介绍了NIO中一系列同步非阻塞API的用法。在Java 7中引入了NIO的改进版NIO 2,NIO 2 也就是我们常说的 AIO,它是异步非阻塞的IO方式。

AIO的核心概念就是发起非阻塞方式的I/O操作,立即响应,却不立即返回结果,当I/O操作完成时通知。

Asynchronous Channel APIs 工作原理

从Java 7开始,java.nio.channel包中新增加4个异步Channel:

  • AsynchronousFileChannel
  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousDatagramChannel

从名字上,你可以感受到它们与同步非阻塞Channel API在风格上有些类似。而且,同步非阻塞Channel API中的诸多用法在新的异步非阻塞Channel中依然适用,不同之处在于,新的异步Channel能够允许一些操作异步执行

当操作启动时,异步通道API为我们提供了两种用于监视和控制挂起操作的备选方案。

  • 第一种通过返回一个 java.util.concurrent.Future对象来表示异步操作的结果
  • 第二种是通过传递给操作一个新类的对象 java.nio.channels.CompletionHandler来完成,它会定义在操作完成后所执行的处理方法。

Future方式

Future 对象代表了异步操作执行的结果。假设我们想要创建一个服务来监听客服端的连接,我们调用 AsynchronousServerSocketChannel 的静态方法 open(),并绑到到一个端口上,代码如下:

1
2
3
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress("127.0.0.1", 8899));

然后,调用 accept() 方法,返回 SocketChannel:

1
Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();

在标准的NIO中,如果我们调用ServerSocketChannel中的 accept() 方法,在设置了block mode为true的情况下,它将会一直阻塞,直到有客户端的连接进来。但是 AsynchronousSocketChannel的accept()方法,则会立即返回一个 Future 对象。

Future对象的泛型类型是操作的返回类型。 在上面的例子中,它是AsynchronousSocketChannel,但它也可以是Integer或String,具体取决于操作的最终返回类型。

检查是否完成
1
future.isDone();

如果底层的操作已经完成,则返回 true。 请注意,在这种情况下,完成可能意味着正常终止、异常或取消。

检查操作是否取消
1
future.isCancelled();

如果操作在正常完成之前被取消,则返回 true。

尝试取消操作
1
future.cancel(true);

如果任务已完成,已取消或由于某些其他原因无法取消,则此尝试将失败,返回false。 如果成功,并且在调用canel()接口时此任务尚未启动,则此任务永远不会运行。 如果任务已经开始,则由 mayInterruptIfRunning 参数确定是否应该在尝试停止任务时中断执行此任务的线程。

获取操作结果
1
2
3
4
AsynchronousSocketChannel client= future.get();

// 设置10s的阻塞时间
AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS);

如果我们在操作完成前调用此API,它将一直阻塞,直到操作完成并返回操作结果。

CompletionHandler 方式

使用Future来处理操作的替代方法是使用CompletionHandler类的回调机制。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);

listener.accept(
attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
public void completed(
AsynchronousSocketChannel client, Object attachment) {
// do whatever with client
}
public void failed(Throwable exc, Object attachment) {
// handle failure
}
});

当 I/O 操作完全成功时,completed() 接口将会被调用。如果操作失败,则调用 failed() 接口。

AsynchronousFileChannel

理解了前面我们讲的 Asynchronous Channel API 的使用方法,接下来,我们来介绍AsynchronousFileChannel的使用。

创建

1
2
3
4
5
Path path = Paths.get("/Users/wangwei/Desktop/test.txt");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path,
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE);

第一个参数指定文件的路径,第二参数指定文件打开之后的操作方式,java.nio.file.StandardOpenOption 枚举中定义了一些列的操作方式。

读取文件

Future 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> operation = fileChannel.read(buffer, 0);

//run other code as operation continues in background

operation.get();

String fileContent = new String(buffer.array()).trim();
buffer.clear();
System.out.println(fileContent);

fileChannel.read() 方法,第一个参数 byteBuffer 用于存储读取的文件内容,第二参数指定起始的读取位置。该方法将会立即返回,不管文件是否读取完毕。

接下来,代码将继续往下执行,而与此同时read()操作还依旧在后台运行。当我们执行完其它代码之后,我们可以调用 get() 接口,如果读取操作已经完成,则get() 接口会立即返回结果,否则会一直阻塞直到操作完成。

CompletionHandler 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Path path = Paths.get("//Users/wangwei/VirtualBox VMs/Win10_1803_English_x64.iso");
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
fileChannel.read(buffer, 0, buffer,
new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {
// TODO
// result is number of bytes read
// attachment is the buffer containing content
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
// TODO
}
});

上面的代码我们用到了另一个 read() 接口,使用了 CompletionHandler 类。它的第一个泛型参数表示操作的返回类型,在 AsynchronousFileChannel 这个例子则返回 Integer 类型,表示读取的字节数。

第二个泛型参数表示的是附加对象类型, 这里我们选择附加缓冲区,以便在读取完成时,我们可以在 completed 回调API中使用文件中的内容。

写入文件

同读取一样,我们也可以使用两种方式来写入文件内容,先来创建 AsynchronousFileChannel:

1
2
AsynchronousFileChannel fileChannel
= AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);

这里的操作类型设置为了 StandardOpenOption.WRITE

Future 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello world".getBytes());
buffer.flip();

Future<Integer> operation = fileChannel.write(buffer, 0);
buffer.clear();

//run other code as operation continues in background

operation.get();

CompletionHandler 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
AsynchronousFileChannel fileChannel =
AsynchronousFileChannel.open(path, StandardOpenOption.READ);

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("hello world".getBytes());
buffer.flip();

fileChannel.write(
buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {
// result is number of bytes written
// attachment is the buffer
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});

参考资料

请我喝杯咖啡吧~