Selector (选择器) 是一个 Java NIO 组件,它可以检查一个或多个 NIO 通道,并确定哪些通道已经准备好了相应的操作 (例如,读写操作)。 这就是单个线程可以管理多个通道,从而管理多个连接的原因。

为什么要用 Selector

单个线程能够处理多个通道的优势在于你只需要少数的线程就可以处理许多的通道。 实际上,你只需使用一个线程就可以处理所有通道。对于操作系统而言,在线程之间切换是非常消耗资源的,并且每个线程也占用操作系统中的一些资源(例如:内存)。 因此,使用的线程越少越好。

但是有一点要记住,现代操作系统和多核 CPU 在多任务处理方面表现的越来越好了,因此随着时间的推移,多线程的开销会变得越来越小。 实际上,如果 CPU 有多个内核,如果不使用任务处理机制,就会导致 CPU 资源的浪费。不管怎么说,有关程序设计的讨论属于另外的范畴了,但在我们这里可以确切地说,你可以通过 Selector 使用单个线程来处理多个 Channels。示意图:

selector

要使用选择器,我们不需要任何特殊设置。 我们需要的所有类都是核心的 java.nio 包,我们只需要导入我们需要的类即可。

之后,我们可以使用 Selector 注册多个通道。 当任何通道上发生 I / O 活动时,选择器会通知我们。这就是我们从单个线程中读取大量数据源的方式。

我们向 Selector 注册的任何 Channel 必须是 SelectableChannel 的子类。 这些是一种特殊类型的通道,可以设置为非阻塞模式。

创建 Selector

直接调用 open() 方法:

1
Selector selector = Selector.open();

Selector 注册 Channel

一个 selector 想要监控多个 channel,我们必须将这些 channels 注册到 selector 上。调用 SelectableChannel 一类的 register() 方法就可以实行注册机制。

在注册 channel 之前,我们必须处理非阻塞(non-blocking)模式:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

这意味着我们不能将 FileChannel 与选择器一起使用,因为它们不能像我们使用套接字通道那样切换到非阻塞模式。

第一个参数是我们之前创建的 Selector 对象,第二个参数定义了一个”interest set”,这意味着我们感兴趣的 Event (事件) 是通过 Selector (选择器) 在被监视的 Channel (通道) 中监听的。

我们可以监听四种不同的 Event,每种 Event 都由 SelectionKey 类中的常量表示:

  • Connect — 当一个客户端尝试去连接服务端时。常量定义: SelectionKey.OP_CONNECT
  • Accept — 当服务端接受来自客户端的连接请求时。常量定义:SelectionKey.OP_ACCEPT
  • Read — 当服务端准备读取 channel 中的数据时。常量定义:SelectionKey.OP_READ
  • Write — 当服务端准备写入数据到 channel 中时。常量定义:SelectionKey.OP_WRITE

如果对多个 Event 感兴趣时,对这些常量进行 OR 操作:

1
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;    

SelectionKey’s

当我们使用选择器注册一个通道时,我们会得到一个 SelectionKey 对象。 该对象中包含了 Channel 注册的相关数据。

它包含一些重要的属性,我们必须很好地理解它们才能在 Channel 上使用 Selector。 我们将在以下小节中详解这些属性。

The Interest Set

Interset Set (兴趣集) 定义了我们希望 Selector 在 Channel 上监听的事件集。它是一个整数值,我们可以通过以下方式获取此信息。

1
2
3
4
5
6
int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

你可以通过 interestSet 与 SelectionKey 中某个事件的常量进行 AND 操作,来判断某个 Event 是否处于 Interset Set 中。

The Ready Set

Ready Set 定义了 Channel 准备好的一组操作。主要是在 Selection 后访问 Ready Set。Selection 将在后面解释。 你可以像这样访问 ready set:

1
int readySet = selectionKey.readyOps();

你可以使用与 Interest Set 相同的方式,来判断 Channel 准备好了哪些 Events/Operations。但是,你也可以使用下面四种方式:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

The Channel

SelectionKey 获取被监视的 Channel 非常简单,如下:

1
SelectableChannel channel = selectionKey.channel();

The Selector

同 Channel 一样,获取 Selector 也非常简单,如下:

1
Selector selector = selectionKey.selector();

Attaching Objects

我们可以附加一个对象到 SelectionKey 上去。有时候我们想要给 Channel 一个自定义 ID 或者附加我们想要追踪的任何类型的 Java 对象。

1
2
3
key.attach(Object);

Object object = key.attachment();

或者,我们可以选择在 Channle 注册期间附加对象。 我们将它作为第三个参数添加到 Channel 的 register() 方法中,如下所示:

1
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT, object);

Selecting Channels via a Selector

使用 Selector 注册一个或多个 Channel 后,你可以调用 Selector.select() 方法,该方法会返回那些已经为监听你感兴趣的事件(connect、accept、read、write)做好准备的 Channel 的数量。换句话说,如果你对为 reading 事件做好准备的 Channel 感兴趣,你将会从 select() 方法中收到为 reading 事件做好准备的 channel 的数量。

select() 相关的方法如下:

  • int select()

    该方法会一直阻塞到至少一个通道准备好进行操作。 返回的整数表示其通道已准备好进行操作的键的数量。

  • int select(long timeout)

    与 select () 类似,区别在于它设置了最大的阻塞时间。

  • int selectNow()

    立即返回已经准备好的 channel 数量,没有则返回 0。

select() 返回的 int 值表示了准备好了的 Channel 数量。也就是说,自上次调用 select() 以来已准备好多少个 Channel。如果您调用 select() 并且返回 1,表示一个 Channel 已准备就绪,并且再次调用 *select ()*,并且另一个 Channel 已准备好,它将再次返回 1。如果你没有对第一个准备好的 Channel 做任何操作,那你现在有 2 个就绪的 Channel,但在每次 select() 调用之间只有一个 Channnel 已准备就绪。

selectedKeys()

一旦调用了其中一种 select() 方法,并且返回了 int 值,表示一个或多个 Channel 已准备就绪。你可以通过”selected key set” 来访问到这些已经准备就绪的 Channel。而”selected key set” 可以通过调用 selector.selectedKeys() 方法获得。如下:

1
Set<SelectionKey> selectedKeys = selector.selectedKeys();    

你可以通过迭代的方式获取已经准备好的 Channel,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

注意到每次迭代后面的一个方法:keyIterator.remove();。Selector 并不会从 SelectionKey Set 中主动删除 SelectionKey 实例。当我们处理完 Channel 之后,我们必须手动将其删除。等到 Channel 再一次准备就绪时,Selector 又会将其添加进 SelectionKey Set 中去。

SelectionKey.channel() 方法返回的 Channel 对象需要进行强制转换为你需要的类型。例如:ServerSocketChannelSocketChannel 等等。

Close()

当你用完 selector 之后,需要调用 selector.close(); 将会该 selector 并让所有注册的 SelectionKey 变得无效。

不过,Channel 并不会关闭。

完整示例

我们来实现一个 Echo 服务。

EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package one.wangwei.java.nio;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class EchoServer {

public static void main(String[] args) throws Exception {

ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8900));
serverSocket.configureBlocking(false);

Selector selector = Selector.open();
serverSocket.register(selector, SelectionKey.OP_ACCEPT);

ByteBuffer byteBuffer = ByteBuffer.allocate(256);

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();

if (selectionKey.isAcceptable()) {
register(selector, serverSocket);
}

if (selectionKey.isReadable()) {
answerWithEcho(byteBuffer, selectionKey);
}
iterator.remove();
}
}
}

private static void register(Selector selector, ServerSocketChannel socketChannel) throws IOException {
SocketChannel client = socketChannel.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
}

private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
client.read(buffer);
buffer.flip();
client.write(buffer);
buffer.clear();
}

public static Process start() throws IOException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classpath = System.getProperty("java.class.path");
String className = EchoServer.class.getCanonicalName();
ProcessBuilder processBuilder = new ProcessBuilder(javaBin, "-cp", classpath, className);
return processBuilder.start();
}

}

EchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package one.wangwei.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class EchoClient {

private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;

private EchoClient() {
try {
client = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8900));
buffer = ByteBuffer.allocate(256);
} catch (IOException e) {
e.printStackTrace();
}
}

public static EchoClient start() {
if (instance == null) {
instance = new EchoClient();
}
return instance;
}

public static void stop() throws IOException {
buffer = null;
client.close();
}

public String sendMessage(String msg) {
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try {
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
System.out.println("response=" + response);
buffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
return response;
}

public static void main(String[] args) {
EchoClient client = EchoClient.start();
System.out.println("Enter your message:");
Scanner scanner = new Scanner(System.in);
client.sendMessage(scanner.nextLine());
}
}

参考资料