前面,我们分析了 Netty EventLoop 的 创建启动 原理,接下里我们来分析 Netty 中另外两个重要组件 —— ChannelHandler 与 **Pipeline**。Netty 中 I/O 事件的传播机制均由它负责,下面我们来看看它是如何实现的。

Netty 版本:4.1.30

我们前面在讲 Channel 创建 时,在 AbstractChannel 的构造函数中, 一笔带过地提到了 Pipeline,现在我们来深入分析一下它的原理。

概述

Netty channel lifecycle

前面,我们在分析 Netty channel 源码时,分析了 Channel 的创建、初始化、注册、绑定过程。在 Netty 中,channel 的生命周期如下所示:

Channel lifecycle

  • ChannelRegistered:Channel 注册到了 EventLoop 上
  • ChannelActive:Channel 激活,连接到了远程某一个节点上,可以收发数据了
  • ChannelInactive:断开连接
  • ChannelUnregistered:Channel 从 EventLoop 上取消注册

Netty channelHandler

Channel 每一次状态的变化,都会产生一个对应的事件,并且都会触发 ChannelHandler 中对应的方法进行处理,例如,当一个 Channel 成功注册到 EventLoop 上后,会有触发 channelHanndler 中的 handlerRegistered (…) 来处理该事件。

ChannelHandlerUML

在上面的 ChannelHandler UML 中,最为重要的两个 ChannelHandler:

Netty ChannelPipeline

前面我们在分析 Channel 创建过程时,每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,如图所示:

Channel-Pipeline

一个 Channel 包含了一个 ChannelPipeline,ChannelPipeline 内部是一个双向的链表结构,内部由一个个的 ChannelHandlerContext 节点组成,ChannelPipeline 有头尾两个固定的节点 HeadContext 与 TailContext。用户自定的 ChannelHandler 就是由 ChannelHandlerContext 包装成 Pipeline 的节点,参与 Channel 整个生命周期中所触发的入站事件与出站事件以及相应数据流的拦截处理。

根据事件的起源,事件将会被 ChannelInboundHandler (入站处理器) 或者 ChannelOutboundHandler (出站处理器) 处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个 ChannelHandler,如图所示:

Channel Pipeline Event Flow

Pipeline UML

我们先来看下 ChannelPipeline 以及 ChannelHandlerContext 的类图结构,它们都实现了 ChannelInboundInvokerChannelOutboundInvoker 接口。

Netty Pipeline UML

Pipeline 初始化

AbstractChannel 构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 创建默认Pipeline
pipeline = newChannelPipeline();
}

// 创建默认Pipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline 构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

// 设置尾部节点
tail = new TailContext(this);
// 设置头部节点
head = new HeadContext(this);

// 将tail与head串联起来
head.next = tail;
tail.prev = head;
}

我们可以看到 Pipeline 其实是一个双向链表的结构,刚刚初始化的时候,Pipeline (管道) 中只有两个节点,如图:

Pipeline Init Ctx

接下来我们看看组成 Pipeline 节点的对象 —— ChannelHandlerContext。

ChannelHandlerContext

ChannelHandlerContext 实现了 AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker 接口。Pipeline 中的事件传播,都是由 ChannelHandlerContext 负责,将发生的事件从一个节点传到下一个节点。

ChannelHandlerContext 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

// 返回ChannelHandlerContext中绑定的Channel
Channel channel();

// 返回专用于执行任务的 EventExecutor
EventExecutor executor();

// 返回ChannelHandlerContext的唯一名称。该名字将在ChannelHandler被添加到ChannelPipeline时会被用到,从ChannelPipeline中访问注册的ChannelHandler时,也会被用到。
String name();

// 返回ChannelHandlerContext中绑定的ChannelHandler
ChannelHandler handler();

// 属于这个ChannelHandlerContext的ChannelHandler从ChannelPipeline移除了,返回true
boolean isRemoved();

@Override
ChannelHandlerContext fireChannelRegistered();

@Override
ChannelHandlerContext fireChannelUnregistered();

@Override
ChannelHandlerContext fireChannelActive();

@Override
ChannelHandlerContext fireChannelInactive();

@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);

@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);

@Override
ChannelHandlerContext fireChannelRead(Object msg);

@Override
ChannelHandlerContext fireChannelReadComplete();

@Override
ChannelHandlerContext fireChannelWritabilityChanged();

@Override
ChannelHandlerContext read();

@Override
ChannelHandlerContext flush();

// 返回分配的ChannelPipeline
ChannelPipeline pipeline();

// 返回用于分配ByteBuf的ByteBufAllocator
ByteBufAllocator alloc();

}

AttributeMap 接口

实现 AttributeMap 接口,表示 ChannelHandlerContext 节点可以存储自定义的属性。

1
2
3
4
5
6
7
// 属性Map接口
public interface AttributeMap {
// 通过Key获取属性
<T> Attribute<T> attr(AttributeKey<T> key);
// 判断属性是否存在
<T> boolean hasAttr(AttributeKey<T> key);
}

ChannelInboundInvoker 接口

实现 ChannelInboundInvoker 接口,表示节点可以用于传播入站相关的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public interface ChannelInboundInvoker {
// 当Channel注册到EventLoop上时
// 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRegistered();

// 当Channel从EventLoop上取消注册
// 调用ChannelPipeline中下一个ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelUnregistered();

// 当Channel处理激活状态,意味着连接已经建立
// 调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelActive();

// 当Channel处理失效状态,意味着连接已经断开
// 调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelInactive();

// 在pipeline中某个一个入站(inbound)操作出现了异常
// 调用ChannelPipeline中下一个ChannelInboundHandler的exceptionCaught(ChannelHandlerContext)方法
ChannelInboundInvoker fireExceptionCaught(Throwable cause);

// 收到用户自定义的事件
// 调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlerContext)方法
ChannelInboundInvoker fireUserEventTriggered(Object event);

// Channel接收到了消息
// 调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelRead(Object msg);

// 调用ChannelPipeline中下一个ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelReadComplete();

// 调用ChannelPipeline中下一个ChannelInboundHandler的channelWritabilityChanged(ChannelHandlerContext)方法
ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker 接口

实现 ChannelOutboundInvoker 接口,意味着节点可以用来处理出站相关的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public interface ChannelOutboundInvoker {

// 将Channel绑定到一个本地地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

// 将Channel连接到一个远程地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandler的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

// 将Channel断开连接。这将调用ChannelPipeline中的下一个ChannelOutbound- Handler的disconnect(ChannelHandlerContext, Channel Promise)方法
ChannelFuture disconnect();
ChannelFuture disconnect(ChannelPromise promise);

// 将Channel关闭。这将调用ChannelPipeline中的下一个ChannelOutbound- Handler的close(ChannelHandlerContext, ChannelPromise)方法
ChannelFuture close();
ChannelFuture close(ChannelPromise promise);

// 将Channel从它先前所分配的EventExecutor(即EventLoop)中注销。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的deregister (ChannelHandlerContext, ChannelPromise)方法
ChannelFuture deregister();
ChannelFuture deregister(ChannelPromise promise);

// 请求从Channel中读取更多的数据。这将调用ChannelPipeline中的下一个ChannelOutboundHandler的read(ChannelHandlerContext)方法
ChannelOutboundInvoker read();

// 将消息写入Channel。这将调用ChannelPipeline中的下一个Channel- OutboundHandler的write(ChannelHandlerContext, Object msg, Channel- Promise)方法。注意:这并不会将消息写入底层的Socket,而只会将它放入队列中。要将它写入Socket,需要调用flush()或者writeAndFlush()方法
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);

// 冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个Channel- OutboundHandler的flush(ChannelHandlerContext)方法
ChannelOutboundInvoker flush();

// 这是一个先调用write()方法再接着调用flush()方法的便利方法
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);

ChannelPromise newPromise();

ChannelProgressivePromise newProgressivePromise();

ChannelFuture newSucceededFuture();

ChannelFuture newFailedFuture(Throwable cause);

ChannelPromise voidPromise();
}

TailContext & HeadContext

接下来,我们看看 Pipeline 中的头部与尾部节点。

TailContext 节点

TailContext 是尾部节点,inbound 类型,主要处理 Pipeline 中数据流的收尾工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

TailContext(DefaultChannelPipeline pipeline) {
// 调用AbstractChannelHandlerContext构造器
// TailContext是一个inbound(入站)节点
super(pipeline, null, TAIL_NAME, true, false);
// 设置添加完成
setAddComplete();
}

// 返回Handler,就是它自身
@Override
public ChannelHandler handler() {
return this;
}

...

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}

...
}

// 如果pipeline中有异常没做处理,最终会由TailContext打赢一个警告日志
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
// 释放对象
ReferenceCountUtil.release(cause);
}
}

// 如果pipeline中有read消息没有处理,最终会由TailContext打赢一个警告日志
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}

// 设置 ChannelHandlerContext 状态为添加完成,状态=2
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}

AbstractChannelHandlerContext

AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

...

// 下一个节点
volatile AbstractChannelHandlerContext next;
// 上一个节点
volatile AbstractChannelHandlerContext prev;

// 是否为inBound类型
private final boolean inbound;
// 是否为outbound类型
private final boolean outbound;
// 绑定的默认pipeline
private final DefaultChannelPipeline pipeline;
// 节点名
private final String name;
private final boolean ordered;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;

...

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
// 设置HandlerContext名称
this.name = ObjectUtil.checkNotNull(name, "name");
// 绑定pipeline
this.pipeline = pipeline;
// 绑定executor(这里为null)
this.executor = executor;
// 如果节点为inbound类型就设置为true
this.inbound = inbound;
// 如果节点为outbound类型就设置为true
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}

...

}

DefaultChannelHandlerContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

private final ChannelHandler handler;

DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
// 调用 AbstractChannelHandlerContext 构造函数
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}

@Override
public ChannelHandler handler() {
return handler;
}

// 是否为inBound类型
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}

// 是否为outBound类型
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}

HeadContext

HeadContext 是头部节点,outbound 类型,用于传播事件和进行一些底层 socket 操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {
// 调用AbstractChannelHandlerContext构造器
// HeadContext是一个outbound(出站)节点
super(pipeline, null, HEAD_NAME, false, true);
// 设置Unsafe对象
unsafe = pipeline.channel().unsafe();
// 设置添加完成
setAddComplete();
}

// 返回ChannelHandler,就只它自身
@Override
public ChannelHandler handler() {
return this;
}

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
// 调用 unsafe 进行bind操作
unsafe.bind(localAddress, promise);
}

@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
// 调用 unsafe 进行 connect 操作
unsafe.connect(remoteAddress, localAddress, promise);
}

@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 调用 unsafe 进行 disconnect 操作
unsafe.disconnect(promise);
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 调用 unsafe 进行 close 操作
unsafe.close(promise);
}

@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
// 调用 unsafe 进行 deregister 操作
unsafe.deregister(promise);
}

@Override
public void read(ChannelHandlerContext ctx) {
// 调用 unsafe 进行 read 操作
unsafe.beginRead();
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 调用 unsafe 进行 write 操作
unsafe.write(msg, promise);
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 调用 unsafe 进行 flush 操作
unsafe.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 传播ExceptionCaught事件
ctx.fireExceptionCaught(cause);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
// 传播channelRegistered事件
ctx.fireChannelRegistered();
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
// 传播channelUnregistered事件
ctx.fireChannelUnregistered();

// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 传播 channelActive 事件
ctx.fireChannelActive();
// 在 https://wangwei.one/posts/netty-channel-source-analyse.html 中分析过了
// 主要是在channel激活之后,向底层的selector注册一个SelectionKey.OP_ACCEPT监听事件
// 这样channel在连接之后,就可以监听到一个read事件
readIfIsAutoRead();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 传播 channelInactive 事件
ctx.fireChannelInactive();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传播 channelRead 事件
ctx.fireChannelRead(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 传播 channelReadComplete 事件
ctx.fireChannelReadComplete();
//
readIfIsAutoRead();
}

private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 传播 userEventTriggered 事件
ctx.fireUserEventTriggered(evt);
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 传播 channelWritabilityChanged 事件
ctx.fireChannelWritabilityChanged();
}
}

Pipeline 节点添加

上面我们分析了 Pipeline 的基本结构,接下来我们看看 Pipeline 添加节点(也就是 Handler 处理器)的过程。该过程主要分为三步:

  • 判断是否重复添加
  • 创建节点并添加至链表
  • 回调添加完成事件

以这段常见的代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 添加 serverHandler
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();

我们从 ChannelPipeline.addLast() 方法进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class DefaultChannelPipeline implements ChannelPipeline {

...

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 循环处理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查是否重复
checkMultiplicity(handler);
// 创建新节点
newCtx = newContext(group, filterName(name, handler), handler);
// 添加新节点
addLast0(newCtx);

// 如果 registered 为 false,则表示这个channel还未注册到EventLoop上.
// 在这种情况下,我们添加一个Task到PendingHandlerCallback中,
// 等到这个channel注册成功之后,将会调用立即调用 ChannelHandler.handlerAdded(...) 方法,已达到channel添加的目的
if (!registered) {
// 设置为待添加状态
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

// 获取executor
EventExecutor executor = newCtx.executor();

if (!executor.inEventLoop()) {
// 设置为待添加状态
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
// 回调添加完成事件
callHandlerAdded0(newCtx);
}
});
return this;
}
}
// 回调添加完成事件
callHandlerAdded0(newCtx);
return this;
}

// 检查是否重复
private static void checkMultiplicity(ChannelHandler handler) {
// handler是否为ChannelHandlerAdapter类型,不是则不做处理
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 判断handler是否添加了Sharable注解 && 是否添加过了
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}

// 创建新的节点
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// 调用DefaultChannelHandlerContext的构造函数
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

// 在tail节点之前添加新节点
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

// 回调ChannelHandler中的handlerAdded方法
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// 我们必须在handlerAdded方法之前调用setAddComplete方法。否则的话,一旦handlerAdded方法产生了任何pipeline事件,由于状态的缘故,ctx.handler()将会丢失这些事件的处理。
// 设置新节点的状态为添加完成状态
ctx.setAddComplete();
// 调用handlerAdded接口
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
...
// 如果添加失败,则删除新节点
remove0(ctx);
...
}
}

...

}

我们来看下 setAddComplete () 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

...

// 通过自旋操作,设置状态为ADD_COMPLETE
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}

...

// 设置为 ADD_PENDING 状态
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated;
// This should always be true as it MUST be called before setAddComplete() or setRemoved().
}

...
}

回调用户自定义 Handler 中的 handlerAdded 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {

...

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.printf("ServerHandler added ....");
}

...

}

ChannelInitializer

关于回调 ChannelHandler 中的 handlerAdded () 方法,最常见的一个场景就是,使用 ChannelInitializer 来添加我们自定义的 ChannelHandler。ChannelInitializer 被添加完成之后,会回调到它的 initChannel 方法。

接下来,我们看看 ChannelInitializer 这个类,它是一个特殊的 ChannelInboundHandler,它提供了一种在 Channel 注册到 EventLoop 后初始化 Channel 的简便方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();

/**
* 当 ch 注册成功之后,该方法就会被调用,该方法结束返回之后,此ChannelInitializer实例将会从Channel所绑定的ChannelPipeline中移除
*
* @param ch 所注册的Channel
*
*/
protected abstract void initChannel(C ch) throws Exception;

...

// ChannelInitializer 添加成功后,会回调到handlerAdded()接口
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}

@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 标记ctx为true,且之前没有标记过。防止重复执行
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
// 调用initChannel方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
// 最终会删除 ChannelInitializer 实例
remove(ctx);
}
return true;
}
return false;
}

// 删除 ChannelInitializer 实例
private void remove(ChannelHandlerContext ctx) {
try {
// 获取 Pipeline
ChannelPipeline pipeline = ctx.pipeline();
// 从 Pipeline 中返回 ChannelInitializer 实例
if (pipeline.context(this) != null) {
// 删除 ChannelInitializer 实例
// 删除逻辑请看下一小节
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}

}

遍历 ChannelHandlerContext 节点查询出 ChannelHandler 实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DefaultChannelPipeline implements ChannelPipeline {

...

// 通过handler获取ChannelHandlerContext
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}

AbstractChannelHandlerContext ctx = head.next;
for (;;) {

if (ctx == null) {
return null;
}

if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}

...

}

Pipeline 中除了 addLast 方法外, 还有 addFirst、addBefore、addAfter 等方法,逻辑类似,可以自行研究学习。

Pipeline 节点删除

上面,我们讲了 Pipeline 节点的添加,这小结我们看看 Pipeline 节点的删除功能。

netty 有个最大的特性之一就是 Handler 可插拔,做到动态编织 pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此 context 移除,下次 pipeline 在传播事件的时候就就不会调用到权限认证处理器。

下面是权限认证 Handler 最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此 Handler,否则,直接关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 鉴权Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {

...

@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}

private boolean verify(ByteBuf byteBuf) {
//...
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("AuthHandler has been removed ! ");
}
}

我们来看看 DefaultChannelPipeline 中的 remove 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class DefaultChannelPipeline implements ChannelPipeline {

...

// 从Pipeline中删除ChannelHandler
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}

...

// 获取 ChannelHandler ,获取不到就抛出异常
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}

...

// 删除
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
// ctx不能为heand与tail
assert ctx != head && ctx != tail;

synchronized (this) {
// 从pipeline中删除ChannelHandlerContext节点
remove0(ctx);

// 如果为false,则表明channel还没有注册到eventloop上
// 在删除这种场景下,我们先添加一个Task,一旦channel注册成功就会调用这个Task,这个Task就会立即调用ChannelHandler.handlerRemoved(...)方法,来从pipeline中删除context。
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}

EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
// 回调 handlerRemoved 方法
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
// 回调 handlerRemoved 方法
callHandlerRemoved0(ctx);
return ctx;
}

...

// 删除节点 ChannelHandlerContext
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}

...

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
// 回调 handlerRemoved 方法
// 也就是我们前面例子 AuthHandler 中的 handlerRemoved() 方法
ctx.handler().handlerRemoved(ctx);
} finally {
// 设置为ctx 状态为 REMOVE_COMPLETE
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}

...

}

好了, 删除的逻辑就分析到这里了。

小结

这一讲我们分析了 Pipeline 的创建过程,了解 Pipeline 中的链表结构以及每个节点的数据结构。还分析了 Pipeline 是如何添加节点的,又是如何删除节点的。接下来 ,我们会分析 Pipeline 如何进行事件传播的。

参考资料