前面 ,我们分析了 Netty Pipeline 的初始化及节点添加与删除逻辑。接下来,我们将来分析 Pipeline 的事件传播机制。
Netty 版本:4.1.30
inBound 事件传播 示例 我们通过下面这个例子来演示 Netty Pipeline 的事件传播机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class NettyPipelineInboundExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (1 ); ServerBootstrap strap = new ServerBootstrap (); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (8888 )) .childOption(ChannelOption.TCP_NODELAY, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA ()); ch.pipeline().addLast(new InboundHandlerB ()); ch.pipeline().addLast(new InboundHandlerC ()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler A : " + msg); ctx.fireChannelRead(msg); } } class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler B : " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("Hello world" ); } } class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler C : " + msg); ctx.fireChannelRead(msg); } }
源码
通过 telnet 来连接上面启动好的 netty 服务,触发 channel active 事件:
按照 InboundHandlerA、InboundHandlerB、InboundHandlerC 的添加顺序,控制台输出如下信息:
1 2 3 InboundHandler A : Hello world InboundHandler B : Hello world InboundHandler C : Hello world
若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:
1 2 3 4 5 6 7 ... ch.pipeline().addLast(new InboundHandlerB ()); ch.pipeline().addLast(new InboundHandlerA ()); ch.pipeline().addLast(new InboundHandlerC ()); ...
输出如下信息:
1 2 3 InboundHandler B : Hello world InboundHandler A : Hello world InboundHandler C : Hello world
源码分析
强烈建议 下面的流程,自己通过 IDE 的 Debug 模式来分析
待 netty 启动成功,通过 telnet 连接到 netty,然后通过 telnet 终端输入任意字符(这一步才开启 Debug 模式),进入 Debug 模式。
触发 channel read 事件,从下面的入口开始调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline fireChannelRead (Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this ; } ... }
调用 AbstractChannelHandlerContext 中的 invokeChannelRead(head, msg)
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable () { @Override public void run () { next.invokeChannelRead(m); } }); } } private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } ... @Override public ChannelHandlerContext fireChannelRead (final Object msg) { invokeChannelRead(findContextInbound(), msg); return this ; } ... private AbstractChannelHandlerContext findContextInbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } ... }
Pipeline 中的第一个节点为 HeadContext,它对于 channelRead 事件的处理,是直接往下传播,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 final class HeadContext extends AbstractChannelHandlerContext ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } ... }
就这样一直循环下去,依次会调用到 InboundHandlerA、InboundHandlerB、InboundHandlerC 中的 channelRead(ChannelHandlerContext ctx, Object msg)
接口。
到最后一个 TailContext 节点,它对 channelRead 事件的处理如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } ... } ... protected void onUnhandledInboundMessage (Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } } ... }
以上就是 pipeline 对 inBound 消息的处理流程。
SimpleChannelInboundHandler 在前面的例子中,假如中间有一个 ChannelHandler 未对 channelRead 事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。
我们还可以继承 SimpleChannelInboundHandler 来自定义 ChannelHandler,它的 channelRead 方法,对消息对象做了 msg 处理,防止内存泄露。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public abstract class SimpleChannelInboundHandler <I> extends ChannelInboundHandlerAdapter { ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true ; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false ; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } ... }
outBound 事件传播 接下来,我们来分析 Pipeline 的 outBound 事件传播机制。代码示例如下:
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class NettyPipelineOutboundExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (1 ); ServerBootstrap strap = new ServerBootstrap (); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (8888 )) .childOption(ChannelOption.TCP_NODELAY, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new OutboundHandlerA ()); ch.pipeline().addLast(new OutboundHandlerB ()); ch.pipeline().addLast(new OutboundHandlerC ()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerA: " + msg); ctx.write(msg, promise); } } class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerB: " + msg); ctx.write(msg, promise); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { ctx.channel().write("Hello world ! " ); }, 3 , TimeUnit.SECONDS); } } class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerC: " + msg); ctx.write(msg, promise); } }
源码
通过 telnet 来连接上面启动好的 netty 服务,触发 channel added 事件:
按照 OutboundHandlerA、OutboundHandlerB、OutboundHandlerC 的添加顺序,控制台输出如下信息:
1 2 3 OutboundHandlerC: Hello world ! OutboundHandlerB: Hello world ! OutboundHandlerA: Hello world !
输出的顺序正好与 ChannelHandler 的添加顺序相反。
若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:
1 2 3 4 5 6 7 ... ch.pipeline().addLast(new InboundHandlerB ()); ch.pipeline().addLast(new InboundHandlerA ()); ch.pipeline().addLast(new InboundHandlerC ()); ...
输出如下信息:
1 2 3 OutboundHandlerC: Hello world ! OutboundHandlerA: Hello world ! OutboundHandlerB: Hello world !
源码分析
强烈建议 下面的流程,自己通过 IDE 的 Debug 模式来分析
从 channel 的 write 方法开始,往下传播 write 事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public ChannelFuture write (Object msg) { return pipeline.write(msg); } ... }
接着来看看 Pipeline 中的 write 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelFuture write (Object msg) { return tail.write(msg); } ... }
调用 ChannelHandlerContext 中的 write 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... @Override public ChannelFuture write (Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write (final Object msg, final ChannelPromise promise) { if (msg == null ) { throw new NullPointerException ("msg" ); } try { if (isNotValidPromise(promise, true )) { ReferenceCountUtil.release(msg); return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false , promise); return promise; } ... private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } private AbstractChannelHandlerContext findContextOutbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } private void invokeWrite (Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0 (Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this , msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } ... }
最终会调用到 HeadContext 的 write 接口:
1 2 3 4 5 @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
异常传播 了解了 Pipeline 的入站与出站事件的机制之后,我们再来看看 Pipeline 的异常处理机制。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class NettyPipelineExceptionCaughtExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (1 ); ServerBootstrap strap = new ServerBootstrap (); strap.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (8888 )) .childOption(ChannelOption.TCP_NODELAY, true ) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA ()); ch.pipeline().addLast(new InboundHandlerB ()); ch.pipeline().addLast(new InboundHandlerC ()); ch.pipeline().addLast(new OutboundHandlerA ()); ch.pipeline().addLast(new OutboundHandlerB ()); ch.pipeline().addLast(new OutboundHandlerC ()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } static class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { throw new Exception ("ERROR !!!" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } }
源码
通过 telnet 来连接上面启动好的 netty 服务,并在控制台发送任意字符:
触发 channel read 事件并抛出异常,控制台输出如下信息:
1 2 3 4 5 InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!!
可以看到异常的捕获与我们添加的 ChannelHandler 顺序相同。
源码分析 在我们的示例中,InboundHandlerB 的 ChannelRead 接口抛出异常,导致从 InboundHandlerA 将 ChannelRead 事件传播到 InboundHandlerB 的过程中出现异常,异常被捕获。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... @Override public ChannelHandlerContext fireExceptionCaught (final Throwable cause) { invokeExceptionCaught(next, cause); return this ; } static void invokeExceptionCaught (final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause" ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable () { @Override public void run () { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event." , t); logger.warn("The exceptionCaught() event that was failed to submit was:" , cause); } } } } ... private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } private void notifyHandlerException (Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event" , cause); } return ; } invokeExceptionCaught(cause); } private void invokeExceptionCaught (final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this , cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:" , ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:" , error, cause); } } } else { fireExceptionCaught(cause); } } ... }
最终会调用到 TailContext 节点的 exceptionCaught 接口,如果我们中途没有对异常进行拦截处理,做会打印出一段警告信息!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class DefaultChannelPipeline implements ChannelPipeline { ... final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } ... protected void onUnhandledInboundException (Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception." , cause); } finally { ReferenceCountUtil.release(cause); } } } ... }
在实际的应用中,一般会定一个 ChannelHandler,放置 Pipeline 末尾,专门用来处理中途出现的各种异常。
最佳异常处理实践 单独定义 ExceptionCaughtHandler 来处理异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ... class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof Exception) { System.out.println("Successfully caught exception ! " ); } else { } } } ... ch.pipeline().addLast(new ExceptionCaughtHandler ()); ...
输出:
1 2 3 4 5 6 InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!! Successfully caught exception !
Pipeline 回顾与总结 至此,我们对 Pipeline 的原理的解析就完成了。
Pipeline 是在什么时候创建的?
Pipeline 添加与删除节点的逻辑是怎么样的?
netty 是如何判断 ChannelHandler 类型的?
如何处理 ChannelHandler 中抛出的异常?
对于 ChannelHandler 的添加应遵循什么样的顺序?
参考资料