前面 ,我们分析了 Netty Pipeline 的初始化及节点添加与删除逻辑。接下来,我们将来分析 Pipeline 的事件传播机制。

Netty 版本:4.1.30

inBound 事件传播

示例

我们通过下面这个例子来演示 Netty Pipeline 的事件传播机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class NettyPipelineInboundExample {

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap strap = new ServerBootstrap();
strap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(8888))
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InboundHandlerA());
ch.pipeline().addLast(new InboundHandlerB());
ch.pipeline().addLast(new InboundHandlerC());
}
});
try {
ChannelFuture future = strap.bind().sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}

class InboundHandlerA extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandler A : " + msg);
// 传播read事件到下一个channelhandler
ctx.fireChannelRead(msg);
}

}

class InboundHandlerB extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandler B : " + msg);
// 传播read事件到下一个channelhandler
ctx.fireChannelRead(msg);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// channel激活,触发channelRead事件,从pipeline的heandContext节点开始往下传播
ctx.channel().pipeline().fireChannelRead("Hello world");
}
}

class InboundHandlerC extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandler C : " + msg);
// 传播read事件到下一个channelhandler
ctx.fireChannelRead(msg);
}
}

源码

通过 telnet 来连接上面启动好的 netty 服务,触发 channel active 事件:

1
$ telnet 127.0.0.1 8888

按照 InboundHandlerA、InboundHandlerB、InboundHandlerC 的添加顺序,控制台输出如下信息:

1
2
3
InboundHandler A : Hello world
InboundHandler B : Hello world
InboundHandler C : Hello world

若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:

1
2
3
4
5
6
7
...

ch.pipeline().addLast(new InboundHandlerB());
ch.pipeline().addLast(new InboundHandlerA());
ch.pipeline().addLast(new InboundHandlerC());

...

输出如下信息:

1
2
3
InboundHandler B : Hello world
InboundHandler A : Hello world
InboundHandler C : Hello world

源码分析

强烈建议 下面的流程,自己通过 IDE 的 Debug 模式来分析

待 netty 启动成功,通过 telnet 连接到 netty,然后通过 telnet 终端输入任意字符(这一步才开启 Debug 模式),进入 Debug 模式。

触发 channel read 事件,从下面的入口开始调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DefaultChannelPipeline implements ChannelPipeline {

...

// 出发channel read事件
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// 从head节点开始往下传播read事件
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

...

}

调用 AbstractChannelHandlerContext 中的 invokeChannelRead(head, msg) 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

...

// 调用channel read
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 获取消息
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取 EventExecutor
EventExecutor executor = next.executor();
// true
if (executor.inEventLoop()) {
// 调用下面的invokeChannelRead接口:invokeChannelRead(Object msg)
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// handler():获取当前遍历到的channelHandler,第一个为HeandContext,最后为TailContext
// 调用channel handler的channelRead接口
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

...

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 调回到上面的 invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
invokeChannelRead(findContextInbound(), msg);
return this;
}

...

// 遍历出下一个ChannelHandler
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
//获取下一个inbound类型的节点
ctx = ctx.next;
// 必须为inbound类型
} while (!ctx.inbound);
return ctx;
}

...
}

Pipeline 中的第一个节点为 HeadContext,它对于 channelRead 事件的处理,是直接往下传播,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
final class HeadContext extends AbstractChannelHandlerContext

...

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// HeadContext往下传播channelRead事件,
// 调用HeandlerContext中的接口:fireChannelRead(final Object msg)
ctx.fireChannelRead(msg);
}

...
}

就这样一直循环下去,依次会调用到 InboundHandlerA、InboundHandlerB、InboundHandlerC 中的 channelRead(ChannelHandlerContext ctx, Object msg) 接口。

到最后一个 TailContext 节点,它对 channelRead 事件的处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DefaultChannelPipeline implements ChannelPipeline {

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

...

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 调用onUnhandledInboundMessage接口
onUnhandledInboundMessage(msg);
}

...

}

...

// 对未处理inbound消息做最后的处理
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存
ReferenceCountUtil.release(msg);
}
}

...

}

以上就是 pipeline 对 inBound 消息的处理流程。

SimpleChannelInboundHandler

在前面的例子中,假如中间有一个 ChannelHandler 未对 channelRead 事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。

我们还可以继承 SimpleChannelInboundHandler 来自定义 ChannelHandler,它的 channelRead 方法,对消息对象做了 msg 处理,防止内存泄露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
...

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
// 对msg对象的引用数减1,当msg对象的引用数为0时,释放该对象的内存
ReferenceCountUtil.release(msg);
}
}
}

...

}

outBound 事件传播

接下来,我们来分析 Pipeline 的 outBound 事件传播机制。代码示例如下:

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class NettyPipelineOutboundExample {

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap strap = new ServerBootstrap();
strap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(8888))
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OutboundHandlerA());
ch.pipeline().addLast(new OutboundHandlerB());
ch.pipeline().addLast(new OutboundHandlerC());
}
});
try {
ChannelFuture future = strap.bind().sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}

class OutboundHandlerA extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 输出消息
System.out.println("OutboundHandlerA: " + msg);
// 传播write事件到下一个节点
ctx.write(msg, promise);
}
}

class OutboundHandlerB extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 输出消息
System.out.println("OutboundHandlerB: " + msg);
// 传播write事件到下一个节点
ctx.write(msg, promise);
}


@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 待handlerAdded事件触发3s后,模拟触发一个
ctx.executor().schedule(() -> {
// ctx.write("Hello world ! ");
ctx.channel().write("Hello world ! ");
}, 3, TimeUnit.SECONDS);
}
}

class OutboundHandlerC extends ChannelOutboundHandlerAdapter {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 输出消息
System.out.println("OutboundHandlerC: " + msg);
// 传播write事件到下一个节点
ctx.write(msg, promise);
}
}

源码

通过 telnet 来连接上面启动好的 netty 服务,触发 channel added 事件:

1
$ telnet 127.0.0.1 8888

按照 OutboundHandlerA、OutboundHandlerB、OutboundHandlerC 的添加顺序,控制台输出如下信息:

1
2
3
OutboundHandlerC: Hello world ! 
OutboundHandlerB: Hello world !
OutboundHandlerA: Hello world !

输出的顺序正好与 ChannelHandler 的添加顺序相反。

若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:

1
2
3
4
5
6
7
...

ch.pipeline().addLast(new InboundHandlerB());
ch.pipeline().addLast(new InboundHandlerA());
ch.pipeline().addLast(new InboundHandlerC());

...

输出如下信息:

1
2
3
OutboundHandlerC: Hello world ! 
OutboundHandlerA: Hello world !
OutboundHandlerB: Hello world !

源码分析

强烈建议 下面的流程,自己通过 IDE 的 Debug 模式来分析

从 channel 的 write 方法开始,往下传播 write 事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

...

@Override
public ChannelFuture write(Object msg) {
// 调用pipeline往下传播wirte事件
return pipeline.write(msg);
}

...

}

接着来看看 Pipeline 中的 write 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultChannelPipeline implements ChannelPipeline {

...

@Override
public final ChannelFuture write(Object msg) {
// 从tail节点开始传播
return tail.write(msg);
}

...

}

调用 ChannelHandlerContext 中的 write 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

...

@Override
public ChannelFuture write(Object msg) {
// 往下调用write接口
return write(msg, newPromise());
}

@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}

try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 往下调用write接口
write(msg, false, promise);

return promise;
}

...

private void write(Object msg, boolean flush, ChannelPromise promise) {
// 寻找下一个outbound类型的channelHandlerContext
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
// 调用接口 invokeWrite(Object msg, ChannelPromise promise)
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

// 寻找下一个outbound类型的channelHandlerContext
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 继续往下调用
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// 获取当前的channelHandler,调用其write接口
// handler()依次会返回 OutboundHandlerC OutboundHandlerB OutboundHandlerA
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

...

}

最终会调用到 HeadContext 的 write 接口:

1
2
3
4
5
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 调用unsafe进行写数据操作
unsafe.write(msg, promise);
}

异常传播

了解了 Pipeline 的入站与出站事件的机制之后,我们再来看看 Pipeline 的异常处理机制。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class NettyPipelineExceptionCaughtExample {

public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap strap = new ServerBootstrap();
strap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(8888))
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InboundHandlerA());
ch.pipeline().addLast(new InboundHandlerB());
ch.pipeline().addLast(new InboundHandlerC());
ch.pipeline().addLast(new OutboundHandlerA());
ch.pipeline().addLast(new OutboundHandlerB());
ch.pipeline().addLast(new OutboundHandlerC());
}
});
try {
ChannelFuture future = strap.bind().sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}

static class InboundHandlerA extends ChannelInboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerA.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}

static class InboundHandlerB extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new Exception("ERROR !!!");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerB.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}

static class InboundHandlerC extends ChannelInboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerC.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}


static class OutboundHandlerA extends ChannelOutboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerA.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}

}

static class OutboundHandlerB extends ChannelOutboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerB.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}

static class OutboundHandlerC extends ChannelOutboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerC.exceptionCaught:" + cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}

}

源码

通过 telnet 来连接上面启动好的 netty 服务,并在控制台发送任意字符:

1
$ telnet 127.0.0.1 8888

触发 channel read 事件并抛出异常,控制台输出如下信息:

1
2
3
4
5
InboundHandlerB.exceptionCaught:ERROR !!!
InboundHandlerC.exceptionCaught:ERROR !!!
OutboundHandlerA.exceptionCaught:ERROR !!!
OutboundHandlerB.exceptionCaught:ERROR !!!
OutboundHandlerC.exceptionCaught:ERROR !!!

可以看到异常的捕获与我们添加的 ChannelHandler 顺序相同。

源码分析

在我们的示例中,InboundHandlerB 的 ChannelRead 接口抛出异常,导致从 InboundHandlerA 将 ChannelRead 事件传播到 InboundHandlerB 的过程中出现异常,异常被捕获。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {

...

@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
//调用invokeExceptionCaught接口
invokeExceptionCaught(next, cause);
return this;
}

static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 调用下一个节点的invokeExceptionCaught接口
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}

...

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 抛出异常
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
// 异常捕获,往下传播
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

// 通知Handler发生异常事件
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
// 往下调用invokeExceptionCaught接口
invokeExceptionCaught(cause);
}


private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
// 调用当前ChannelHandler的exceptionCaught接口
// 在我们的案例中,依次会调用InboundHandlerB、InboundHandlerC、
// OutboundHandlerA、OutboundHandlerB、OutboundHandlC
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}

...

}

最终会调用到 TailContext 节点的 exceptionCaught 接口,如果我们中途没有对异常进行拦截处理,做会打印出一段警告信息!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DefaultChannelPipeline implements ChannelPipeline {

...

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

...

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}

...

protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
}

...

}

在实际的应用中,一般会定一个 ChannelHandler,放置 Pipeline 末尾,专门用来处理中途出现的各种异常。

最佳异常处理实践

单独定义 ExceptionCaughtHandler 来处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

...

class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Exception) {
// TODO
System.out.println("Successfully caught exception ! ");
} else {
// TODO
}
}
}

...

ch.pipeline().addLast(new ExceptionCaughtHandler());

...

输出:

1
2
3
4
5
6
InboundHandlerB.exceptionCaught:ERROR !!!
InboundHandlerC.exceptionCaught:ERROR !!!
OutboundHandlerA.exceptionCaught:ERROR !!!
OutboundHandlerB.exceptionCaught:ERROR !!!
OutboundHandlerC.exceptionCaught:ERROR !!!
Successfully caught exception ! // 成功捕获日志

Pipeline 回顾与总结

至此,我们对 Pipeline 的原理的解析就完成了。

  • Pipeline 是在什么时候创建的?
  • Pipeline 添加与删除节点的逻辑是怎么样的?
  • netty 是如何判断 ChannelHandler 类型的?
  • 如何处理 ChannelHandler 中抛出的异常?
  • 对于 ChannelHandler 的添加应遵循什么样的顺序?

参考资料