Java NIO Selector

Selector(选择器)是一个Java NIO组件,它可以检查一个或多个NIO通道,并确定哪些通道已经准备好了相应的操作(例如,读写操作)。 这就是单个线程可以管理多个通道,从而管理多个连接的原因。

为什么要用Selector

单个线程能够处理多个通道的优势在于你只需要少数的线程就可以处理许多的通道。 实际上,你只需使用一个线程就可以处理所有通道。对于操作系统而言,在线程之间切换是非常消耗资源的,并且每个线程也占用操作系统中的一些资源(例如:内存)。 因此,使用的线程越少越好。

但是有一点要记住,现代操作系统和多核CPU在多任务处理方面表现的越来越好了,因此随着时间的推移,多线程的开销会变得越来越小。 实际上,如果CPU有多个内核,如果不使用任务处理机制,就会导致CPU资源的浪费。不管怎么说,有关程序设计的讨论属于另外的范畴了,但在我们这里可以确切地说,你可以通过Selector使用单个线程来处理多个Channels。示意图:

selector

要使用选择器,我们不需要任何特殊设置。 我们需要的所有类都是核心的java.nio包,我们只需要导入我们需要的类即可。

之后,我们可以使用Selector注册多个通道。 当任何通道上发生I / O活动时,选择器会通知我们。这就是我们从单个线程中读取大量数据源的方式。

我们向Selector注册的任何Channel必须是 SelectableChannel 的子类。 这些是一种特殊类型的通道,可以设置为非阻塞模式。

创建Selector

直接调用 open() 方法:

1
Selector selector = Selector.open();

Selector注册Channel

一个selector想要监控多个channel,我们必须将这些channels注册到selector上。调用 SelectableChannel 一类的 register() 方法就可以实行注册机制。

在注册channel之前,我们必须处理非阻塞(non-blocking)模式:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

这意味着我们不能将FileChannel与选择器一起使用,因为它们不能像我们使用套接字通道那样切换到非阻塞模式。

第一个参数是我们之前创建的Selector对象,第二个参数定义了一个”interest set”,这意味着我们感兴趣的Event(事件)是通过Selector(选择器)在被监视的Channel(通道)中监听的。

我们可以监听四种不同的Event,每种Event都由SelectionKey类中的常量表示:

  • Connect — 当一个客户端尝试去连接服务端时。常量定义: SelectionKey.OP_CONNECT
  • Accept — 当服务端接受来自客户端的连接请求时。常量定义:SelectionKey.OP_ACCEPT
  • Read — 当服务端准备读取channel中的数据时。常量定义:SelectionKey.OP_READ
  • Write — 当服务端准备写入数据到channel中时。常量定义:SelectionKey.OP_WRITE

如果对多个Event感兴趣时,对这些常量进行 OR 操作:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey’s

当我们使用选择器注册一个通道时,我们会得到一个 SelectionKey 对象。 该对象中包含了Channel注册的相关数据。

它包含一些重要的属性,我们必须很好地理解它们才能在Channel上使用Selector。 我们将在以下小节中详解这些属性。

The Interest Set

Interset Set(兴趣集)定义了我们希望Selector在Channel上监听的事件集。它是一个整数值,我们可以通过以下方式获取此信息。

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

你可以通过 interestSet 与 SelectionKey 中某个事件的常量进行 AND 操作,来判断某个Event是否处于Interset Set中。

The Ready Set

Ready Set定义了Channel准备好的一组操作。主要是在Selection后访问 Ready Set。Selection 将在后面解释。 你可以像这样访问ready set:

1
int readySet = selectionKey.readyOps();

你可以使用与Interest Set相同的方式,来判断Channel准备好了哪些Events/Operations。但是,你也可以使用下面四种方式:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

The Channel

SelectionKey 获取被监视的Channel非常简单,如下:

1
SelectableChannel channel = selectionKey.channel();

The Selector

同Channel一样,获取Selector也非常简单,如下:

1
Selector selector = selectionKey.selector();

Attaching Objects

我们可以附加一个对象到SelectionKey上去。有时候我们想要给Channel一个自定义ID或者附加我们想要追踪的任何类型的Java对象。

1
2
3
key.attach(Object);

Object object = key.attachment();

或者,我们可以选择在Channle注册期间附加对象。 我们将它作为第三个参数添加到Channel的 register() 方法中,如下所示:

1
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT, object);

Selecting Channels via a Selector

使用Selector注册一个或多个Channel后,你可以调用 Selector.select() 方法,该方法会返回那些已经为监听你感兴趣的事件(connect、accept、read、write)做好准备的Channel的数量。换句话说,如果你对为reading事件做好准备的Channel感兴趣,你将会从 select() 方法中收到为reading事件做好准备的channel的数量。

select() 相关的方法如下:

  • int select()

    该方法会一直阻塞到至少一个通道准备好进行操作。 返回的整数表示其通道已准备好进行操作的键的数量。

  • int select(long timeout)

    与select()类似,区别在于它设置了最大的阻塞时间。

  • int selectNow()

    立即返回已经准备好的channel数量,没有则返回0。

select() 返回的int值表示了准备好了的Channel数量。也就是说,自上次调用select() 以来已准备好多少个Channel。如果您调用 select() 并且返回1,表示一个Channel已准备就绪,并且再次调用 select(),并且另一个Channel 已准备好,它将再次返回1。如果你没有对第一个准备好的Channel做任何操作,那你现在有2个就绪的Channel,但在每次 select() 调用之间只有一个Channnel已准备就绪。

selectedKeys()

一旦调用了其中一种 select() 方法,并且返回了int值,表示一个或多个Channel已准备就绪。你可以通过”selected key set”来访问到这些已经准备就绪的Channel。而”selected key set”可以通过调用 selector.selectedKeys() 方法获得。如下:

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();

你可以通过迭代的方式获取已经准备好的Channel,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

注意到每次迭代后面的一个方法:keyIterator.remove();。Selector 并不会从 SelectionKey Set 中主动删除 SelectionKey 实例。当我们处理完Channel之后,我们必须手动将其删除。等到Channel再一次准备就绪时,Selector又会将其添加进 SelectionKey Set 中去。

SelectionKey.channel() 方法返回的Channel对象需要进行强制转换为你需要的类型。例如:ServerSocketChannelSocketChannel等等。

Close()

当你用完selector之后,需要调用 selector.close(); 将会该selector并让所有注册的SelectionKey变得无效。

不过,Channel并不会关闭。

完整示例

我们来实现一个Echo服务。

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package one.wangwei.java.nio;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoServer {

public static void main(String[] args) throws Exception {

ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8900));
serverSocket.configureBlocking(false);

Selector selector = Selector.open();
serverSocket.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer byteBuffer = ByteBuffer.allocate(256);

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();

if (selectionKey.isAcceptable()) {
register(selector, serverSocket);
}

if (selectionKey.isReadable()) {
answerWithEcho(byteBuffer, selectionKey);
}
iterator.remove();
}
}
}

private static void register(Selector selector, ServerSocketChannel socketChannel) throws IOException {
SocketChannel client = socketChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}

private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
buffer.flip();
client.write(buffer);
buffer.clear();
}

public static Process start() throws IOException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder processBuilder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return processBuilder.start();
}

}

EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package one.wangwei.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class EchoClient {

private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;

private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8900));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}

public static EchoClient start() {
if (instance == null) {
instance = new EchoClient();
}
return instance;
}

public static void stop() throws IOException {
buffer = null;
client.close();
}

public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}

public static void main(String[] args) {
EchoClient client = EchoClient.start();
System.out.println("Enter your message:");
Scanner scanner = new Scanner(System.in);
client.sendMessage(scanner.nextLine());
}
}

参考资料

请我喝杯咖啡吧~