前面,我们大致了解了 Netty 中的几个核心组件。今天我们就来先来介绍 Netty 的网络通信组件,用于执行网络 I/O 操作 —— Channel

Netty 版本:4.1.30

概述

数据在网络中总是以字节的形式进行流通。我们在进行网络编程时选用何种传输方式编码(OIO、NIO 等)决定了这些字节的传输方式。

在没有 Netty 之前,为了提升系统的并发能力,从 OIO 切换到 NIO 时,需要对代码进行大量的重构,因为相应的 Java NIO 与 IO API 大不相同。而 Netty 在这些 Java 原生 API 的基础上做了一层封装,对用户提供了高度抽象而又统一的 API,从而让传输方式的切换不在变得困难,只需要直接使用即可,而不需要对整个代码进行重构。

Netty Channel UML

netty channel 族如下:

NettyChannel

整个族群中,AbstractChannel 是最为关键的一个抽象类,从它继承出了 AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel、LocalChannel、EmbeddedChannel 等类,每个类代表了不同的协议以及相应的 IO 模型。除了 TCP 协议以外,Netty 还支持很多其他的连接协议,并且每种协议还有 NIO (异步 IO) 和 OIO (Old-IO,即传统的阻塞 IO) 版本的区别。不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:

  • NioSocketChannel:代表异步的客户端 TCP Socket 连接
  • NioServerSocketChannel:异步的服务器端 TCP Socket 连接
  • NioDatagramChannel:异步的 UDP 连接
  • NioSctpChannel:异步的客户端 Sctp 连接
  • NioSctpServerChannel:异步的 Sctp 服务器端连接
  • OioSocketChannel:同步的客户端 TCP Socket 连接
  • OioServerSocketChannel:同步的服务器端 TCP Socket 连接
  • OioDatagramChannel:同步的 UDP 连接
  • OioSctpChannel:同步的 Sctp 服务器端连接
  • OioSctpServerChannel:同步的客户端 TCP Socket 连接

Channel API

我们先来看下最顶层接口 channel 主要的 API,常用的如下:

接口名 描述
eventLoop() Channel 需要注册到 EventLoop 的多路复用器上,用于处理 I/O 事件,通过 eventLoop () 方法可以获取到 Channel 注册的 EventLoop。EventLoop 本质上就是处理网络读写事件的 Reactor 线程。在 Netty 中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义 NioTask 等任务。
pipeline() 返回 channel 分配的 ChannelPipeline
isActive() 判断 channel 是否激活。激活的意义取决于底层的传输类型。例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的
localAddress() 返回本地的 socket 地址
remoteAddress() 返回远程的 socket 地址
flush() 将之前已写的数据冲刷到底层 Channel 上去
write(Object msg) 请求将当前的 msg 通过 ChannelPipeline 写入到目标 Channel 中。注意,write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用 flush 操作才会被写入到 Channel 中,发送给对方。
writeAndFlush() 等同于调用 write () 并接着调用 flush ()
metadate() 熟悉 TCP 协议的读者可能知道,当创建 Socket 的时候需要指定 TCP 参数,例如接收和发送的 TCP 缓冲区大小,TCP 的超时时间。是否重用地址等。在 Netty 中,每个 Channel 对应一个物理链接,每个连接都有自己的 TCP 参数配置。所以,Channel 会聚合一个 ChannelMetadata 用来对 TCP 参数提供元数据描述信息,通过 metadata () 方法就可以获取当前 Channel 的 TCP 参数配置。
read() 从当前的 Channel 中读取数据到第一个 inbound 缓冲区中,如果数据被成功读取,触发 ChannelHandler.channelRead (ChannelHandlerContext,Object) 事件。读取操作 API 调用完成后,紧接着会触发 ChannelHander.channelReadComplete(ChannelHandlerContext)事件,这样业务的 ChannelHandler 可以决定是否需要继续读取数据。如果已经有操作请求被挂起,则后续的读操作会被忽略。
close(ChannelPromise promise) 主动关闭当前连接,通过 ChannelPromise 设置操作结果并进行结果通知,无论操作是否成功,都可以通过 ChannelPromise 获取操作结果。该操作会级联触发 ChannelPipeline 中所有 ChannelHandler 的 ChannelHandler.close (ChannelHandlerContext,ChannelPromise) 事件。
parent() 对于服务端 Channel 而言,它的父 Channel 为空;对于客户端 Channel,它的父 Channel 就是创建它的 ServerSocketChannel。
id() 返回 ChannelId 对象,ChannelId 是 Channel 的唯一标识。

Channel 创建

对 Netty Channel API 以及相关的类有了一个初步了解之后,接下来我们来详细了解一下在 Netty 的启动过程中 Channel 是如何创建的。服务端 Channel 的创建过程,主要分为四个步骤:1)Channel 创建;2)Channel 初始化;3)Channel 注册;4)Channel 绑定。

Netty Channel Process

我们以下面的代码为例进行解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建两个线程组,专门用于网络事件的处理,Reactor线程组
// 用来接收客户端的连接,
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来进行SocketChannel的网络读写
EventLoopGroup workGroup = new NioEventLoopGroup();

// 创建辅助启动类ServerBootstrap,并设置相关配置:
ServerBootstrap b = new ServerBootstrap();
// 设置处理Accept事件和读写操作的事件循环组
b.group(bossGroup, workGroup)
// 配置Channel类型
.channel(NioServerSocketChannel.class)
// 配置监听地址
.localAddress(new InetSocketAddress(port))
// 设置服务器通道的选项,设置TCP属性
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
// 设置建立连接后的客户端通道的选项
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// channel属性,便于保存用户自定义数据
.attr(AttributeKey.newInstance("UserId"), "60293")
.handler(new LoggingHandler(LogLevel.INFO))
// 设置子处理器,主要是用户的自定义处理器,用于处理IO网络事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});

// 调用bind()方法绑定端口,sync()会阻塞等待处理请求。这是因为bind()方法是一个异步过程,会立即返回一个ChannelFuture对象,调用sync()会等待执行完成
ChannelFuture f = b.bind().sync();
// 获得Channel的closeFuture阻塞等待关闭,服务器Channel关闭时closeFuture会完成
f.channel().closeFuture().sync();

调用 channel () 接口设置 AbstractBootstrap 的成员变量 channelFactory,该变量顾名思义就是用于创建 channel 的工厂类。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

...

public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
// 创建 channelFactory
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

...

public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}

if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}

...

channelFactory 设置为 ReflectiveChannelFactory ,在我们这个例子中 clazz 为 NioServerSocketChannel ,我们可以看到其中有个 newChannel () 接口,通过反射的方式来调用,这个接口的调用处我们后面会介绍到。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Channel工厂类
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Class<? extends T> clazz;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
// 通过反射来进行常见Channel实例
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}

接下来我们来看下 NioServerSocketChannel 的构造函数,主要就是:

  • 生成 ServerSocketChannel 对象。NioServerSocketChannel 创建时,首先使用 SelectorProvider 的 openServerSocketChannel 打开服务器套接字通道。SelectorProvider 是 Java 的 NIO 提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有 SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的 Provider:如果 LInux 内核版本 >=2.6 则,具体的 SelectorProvider 为 EPollSelectorProvider,否则为默认的 PollSelectorProvider。
  • 设置 ServerSocketChannelConfig 成员变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// 调用JDK底层API生成 ServerSocketChannel 对象实例
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}

private final ServerSocketChannelConfig config;

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

public NioServerSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}

public NioServerSocketChannel(ServerSocketChannel channel) {
// 调用 AbstractNioChannel 构造器,创建 NioServerSocketChannel,设置SelectionKey为ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
// 创建ChannleConfig对象,主要是TCP参数配置类
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

AbstractNioChannel 的构造器如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 调用 AbstractChannel 构造器
super(parent);
this.ch = ch;
// 从上一步过来,这里设置为 SelectionKey.OP_ACCEPT
this.readInterestOp = readInterestOp;
try {
// 设置为非阻塞状态
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

AbstractChannel 构造器中,会设 Channel 关联的三个核心对象:ChannelId、ChannelPipeline、Unsafe。

  • 初始化 ChannelId,ChannelId 是一个全局唯一的值;
  • 创建 NioMessageUnsafe 实例,该类为 Channel 提供了用于完成网络通讯相关的底层操作,如 connect (),read (),register (),bind (),close () 等;
  • 为 Channel 创建 DefaultChannelPipeline,初始事件传播管道。关于 Pipeline 的分析,请看 后文 的分析。
1
2
3
4
5
6
7
8
9
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 设置ChannelId
id = newId();
// 设置Unsafe
unsafe = newUnsafe();
// 设置Pipeline
pipeline = newChannelPipeline();
}

从 NioServerSocketChannelConfig 的构造函数追溯下去,在 DefaultChannelConfig 会设置 channel 成员变量。

1
2
3
4
5
6
7
8
9
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}

protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
setRecvByteBufAllocator(allocator, channel.metadata());
// 绑定channel
this.channel = channel;
}

以上就是 channel 创建的过程,总结一下:

  • 通过 ReflectiveChannelFactory 工厂类,以反射的方式对 channel 进行创建;
  • channel 创建的过程中,会创建四个重要的对象:ChannelId、ChannelConfig、ChannelPipeline、Unsafe。

Channel 初始化

主要分为以下两步:

  • 将启动器(Bootstrap)设置的选项和属性设置到 NettyChannel 上面
  • 向 Pipeline 添加初始化 Handler,供注册后使用

我们从 AbstractBootstrap 的 bind () 接口进去,调用链:bind () —> doBind (localAddress) —> initAndRegister () —> init (Channel channel),我们看下 ServerBootstrap 中 init () 接口的实现:

1
2
3
4
5
6
7
8
9
10
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 调用Channel工程类的newChannel()接口,创建channel,就是前面我们讲的部分内容
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
....
}

初始化 Channel,我们来重点看下 init (channel) 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void init(Channel channel) throws Exception {
// 获取启动器 启动时配置的option参数,主要是TCP的一些属性
final Map<ChannelOption<?>, Object> options = options0();
// 将获得到 options 配置到 ChannelConfig 中去
synchronized (options) {
setChannelOptions(channel, options, logger);
}

// 获取 ServerBootstrap 启动时配置的 attr 参数
final Map<AttributeKey<?>, Object> attrs = attrs0();
// 配置 Channel attr,主要是设置用户自定义的一些参数
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

// 获取channel中的 pipeline,这个pipeline使我们前面在channel创建过程中设置的 pipeline
ChannelPipeline p = channel.pipeline();

// 将启动器中配置的 childGroup 保存到局部变量 currentChildGroup
final EventLoopGroup currentChildGroup = childGroup;
// 将启动器中配置的 childHandler 保存到局部变量 currentChildHandler
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
// 保存用户设置的 childOptions 到局部变量 currentChildOptions
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
// 保存用户设置的 childAttrs 到局部变量 currentChildAttrs
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 获取启动器上配置的handler
ChannelHandler handler = config.handler();
if (handler != null) {
// 添加 handler 到 pipeline 中
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 用child相关的参数创建出一个新连接接入器ServerBootstrapAcceptor
// 通过 ServerBootstrapAcceptor 可以将一个新连接绑定到一个线程上去
// 每次有新的连接进来 ServerBootstrapAcceptor 都会用child相关的属性对它们进行配置,并注册到ChaildGroup上去
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

对于新连接接入器 ServerBootstrapAcceptor 的分析 ,请查看 后文

Channel 注册

在 channel 完成创建和初始化之后,接下来就需要将其注册到事件轮循器 Selector 上去。我们回到 initAndRegister 接口上去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final ChannelFuture initAndRegister() {

...

// 获取 EventLoopGroup ,并调用它的 register 方法来注册 channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

最终会向下调用到 SingleThreadEventLoop 中的 register 接口:

如何调用到这里,里面的细节需要等到后面文章讲到 MultithreadEventExecutorGroup 再详细说明

1
2
3
4
5
6
7
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 调用unsafe的register接口
promise.channel().unsafe().register(this, promise);
return promise;
}

代码跟踪下去,直到 AbstractChannel 中的 AbstractUnsafe 这个类中的 register 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

// 将该Channel与eventLoop 进行绑定,后续与该channel相关的IO操作都由eventLoop来处理
AbstractChannel.this.eventLoop = eventLoop;
// 初次注册时 eventLoop.inEventLoop() 返回false
if (eventLoop.inEventLoop()) {
// 调用实际的注册接口register0
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 调用实际的注册接口register0
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

register0 接口主要分为以下三段逻辑:

  • doRegister();

  • pipeline.invokeHandlerAddedIfNeeded();

  • pipeline.fireChannelRegistered();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 调用 doRegister() 接口
doRegister();
neverRegistered = false;
registered = true;

// 通过pipeline的传播机制,触发handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 通过pipeline的传播机制,触发channelRegistered事件
pipeline.fireChannelRegistered();
// 还没有绑定,所以这里的 isActive() 返回false.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

我们来看 AbstractNioChannel 中的 doRegister () 接口,最终调用的就是 Java JDK 底层的 NIO API 来注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// eventLoop().unwrappedSelector():获取selector,将在后面介绍 EventLoop 创建时会讲到
// 将selector注册到Java NIO Channel上
// ops 设置为 0,表示不关心任何事件
// att 设置为 channel自身,表示后面还会将channel取出来用作它用(后面文章会讲到)
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

Channel 绑定

在完成创建、初始化以及注册之后,接下来就是 Channel 绑定操作。

本小节涉及到的 pipeline 事件传播机制,我们放到后面的文章中去讲解。

从启动器的 bind () 接口开始,往下调用 doBind () 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化及注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 调用 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
....
}
}

doBind 方法又会调用 doBind0 () 方法,在 doBind0 () 方法中会通过 EventLoop 去执行 channel 的 bind () 任务,关于 EventLoop 的 execute 接口的分析,请看后面的 文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 调用channel.bind接口
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

doBind0 () 方法往下会条用到 pipeline.bind(localAddress, promise); 方法,通过 pipeline 的传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.bind () 方法,这个方法主要做两件事情:

  • 调用 doBind ():调用底层 JDK API 进行 Channel 的端口绑定。
  • 调用 pipeline.fireChannelActive ():

关于 Pipeline 的传播机制,请看 后文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

....

// wasActive 在绑定成功前为 false
boolean wasActive = isActive();
try {
// 调用doBind()调用JDK底层API进行端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 完成绑定之后,isActive() 返回true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发channelActive事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

我们这里看服务端 NioServerSocketChannel 实现的 doBind 方法,最终会调用 JDK 底层 NIO Channel 的 bind 方法:

1
2
3
4
5
6
7
8
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

调用 pipeline.fireChannelActive (),开始传播 active 事件,pipeline 首先就会调用 HeadContext 节点进行事件传播,会调用到 DefaultChannelPipeline.HeadContext.channelActive () 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 触发heanlder 的 ChannelActive 方法
ctx.fireChannelActive();
// 调用接口readIfIsAutoRead
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
// 调用channel.read()
channel.read();
}
}

channel.read () 方法往下会调用到 AbstractChannelHandlerContext.read () 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public ChannelHandlerContext read() {
// 获取下一个ChannelHandlerContext节点
final AbstractChannelHandlerContext next = findContextOutbound();
// 获取EventExecutor
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 调用下一个节点的invokeRead接口
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}

通过 pipeline 的事件传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.beginRead () 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
// 调用 doBeginRead();
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}

我们看下 AbstractNioChannel 对 doBeginRead 接口的实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 注册一个OP_ACCEPT
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// 获取channel注册是的设置的 selectionKey
final SelectionKey selectionKey = this.selectionKey;
// selectionKey无效则返回
if (!selectionKey.isValid()) {
return;
}

readPending = true;
// 前面讲到channel在注册的时候,这是 interestOps 设置的是 0
final int interestOps = selectionKey.interestOps();
// readInterestOp 在前面讲到channel创建的时候,设置值为 SelectionKey.OP_ACCEPT
if ((interestOps & readInterestOp) == 0) {
// 最终 selectionKey 的兴趣集就会设置为 SelectionKey.OP_ACCEPT
// 表示随时可以接收新连接的接入
selectionKey.interestOps(interestOps | readInterestOp);
}
}

总结

至此,我们就分析完了 Channel 的创建、初始化、注册、绑定的流程。其中涉及到的 EventLoopGroup 和 Pipeline 事件传播机制的知识点,我们放到后面的文章中去讲解。

参考资料