前面 ,我们分析了Netty Pipeline的初始化及节点添加与删除逻辑。接下来,我们将来分析Pipeline的事件传播机制。
Netty版本:4.1.30
inBound事件传播 示例 我们通过下面这个例子来演示Netty Pipeline的事件传播机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class NettyPipelineInboundExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup(1 ); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class ) .localAddress (new InetSocketAddress (8888)) .childOption (ChannelOption .TCP_NODELAY , true ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler A : " + msg); ctx.fireChannelRead(msg); } } class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler B : " + msg); ctx.fireChannelRead(msg); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.channel().pipeline().fireChannelRead("Hello world" ); } } class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InboundHandler C : " + msg); ctx.fireChannelRead(msg); } }
源码
通过 telnet 来连接上面启动好的netty服务,触发channel active事件:
按照InboundHandlerA、InboundHandlerB、InboundHandlerC的添加顺序,控制台输出如下信息:
1 2 3 InboundHandler A : Hello world InboundHandler B : Hello world InboundHandler C : Hello world
若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:
1 2 3 4 5 6 7 ... ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerC()); ...
输出如下信息:
1 2 3 InboundHandler B : Hello world InboundHandler A : Hello world InboundHandler C : Hello world
源码分析 强烈建议 下面的流程,自己通过IDE的Debug模式来分析
待netty启动成功,通过telnet连接到netty,然后通过telnet终端输入任意字符(这一步才开启Debug模式),进入Debug模式。
触发channel read事件,从下面的入口开始调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline fireChannelRead (Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this ; } ... }
调用 AbstractChannelHandlerContext 中的 invokeChannelRead(head, msg)
接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run () { next.invokeChannelRead(m); } }); } } private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } ... @Override public ChannelHandlerContext fireChannelRead (final Object msg) { invokeChannelRead(findContextInbound(), msg); return this ; } ... private AbstractChannelHandlerContext findContextInbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } ... }
Pipeline中的第一个节点为HeadContext,它对于channelRead事件的处理,是直接往下传播,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 final class HeadContext extends AbstractChannelHandlerContext ... @Override public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception { ctx.fireChannelRead(msg); } ... }
就这样一直循环下去,依次会调用到 InboundHandlerA、InboundHandlerB、InboundHandlerC 中的 channelRead(ChannelHandlerContext ctx, Object msg)
接口。
到最后一个TailContext节点,它对channelRead事件的处理如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class DefaultChannelPipeline implements ChannelPipeline { final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } ... } ... protected void onUnhandledInboundMessage (Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } } ... }
以上就是pipeline对inBound消息的处理流程。
SimpleChannelInboundHandler 在前面的例子中,假如中间有一个ChannelHandler未对channelRead事件进行传播,就会导致消息对象无法得到释放,最终导致内存泄露。
我们还可以继承 SimpleChannelInboundHandler 来自定义ChannelHandler,它的channelRead方法,对消息对象做了msg处理,防止内存泄露。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public abstract class SimpleChannelInboundHandler <I > extends ChannelInboundHandlerAdapter { ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true ; try { if (acceptInboundMessage(msg)) { @SuppressWarnings ("unchecked" ) I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false ; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } ... }
outBound事件传播 接下来,我们来分析Pipeline的outBound事件传播机制。代码示例如下:
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class NettyPipelineOutboundExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup(1 ); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class ) .localAddress (new InetSocketAddress (8888)) .childOption (ChannelOption .TCP_NODELAY , true ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new OutboundHandlerA()); ch.pipeline().addLast(new OutboundHandlerB()); ch.pipeline().addLast(new OutboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerA: " + msg); ctx.write(msg, promise); } } class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerB: " + msg); ctx.write(msg, promise); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { ctx.channel().write("Hello world ! " ); }, 3 , TimeUnit.SECONDS); } } class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundHandlerC: " + msg); ctx.write(msg, promise); } }
源码
通过 telnet 来连接上面启动好的netty服务,触发channel added事件:
按照OutboundHandlerA、OutboundHandlerB、OutboundHandlerC的添加顺序,控制台输出如下信息:
1 2 3 OutboundHandlerC: Hello world ! OutboundHandlerB: Hello world ! OutboundHandlerA: Hello world !
输出的顺序正好与ChannelHandler的添加顺序相反。
若是调用它们的添加顺序,则会输出对应顺序的信息,e.g:
1 2 3 4 5 6 7 ... ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerC()); ...
输出如下信息:
1 2 3 OutboundHandlerC: Hello world ! OutboundHandlerA: Hello world ! OutboundHandlerB: Hello world !
源码分析 强烈建议 下面的流程,自己通过IDE的Debug模式来分析
从channel的write方法开始,往下传播write事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public ChannelFuture write (Object msg) { return pipeline.write(msg); } ... }
接着来看看Pipeline中的write接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelFuture write (Object msg) { return tail.write(msg); } ... }
调用ChannelHandlerContext中的write接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... @Override public ChannelFuture write (Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write (final Object msg, final ChannelPromise promise) { if (msg == null ) { throw new NullPointerException("msg" ); } try { if (isNotValidPromise(promise, true )) { ReferenceCountUtil.release(msg); return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false , promise); return promise; } ... private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } private AbstractChannelHandlerContext findContextOutbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } private void invokeWrite (Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0 (Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this , msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } ... }
最终会调用到HeadContext的write接口:
1 2 3 4 5 @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
异常传播 了解了Pipeline的入站与出站事件的机制之后,我们再来看看Pipeline的异常处理机制。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public class NettyPipelineExceptionCaughtExample { public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup(1 ); ServerBootstrap strap = new ServerBootstrap(); strap.group(group) .channel(NioServerSocketChannel.class ) .localAddress (new InetSocketAddress (8888)) .childOption (ChannelOption .TCP_NODELAY , true ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandlerA()); ch.pipeline().addLast(new InboundHandlerB()); ch.pipeline().addLast(new InboundHandlerC()); ch.pipeline().addLast(new OutboundHandlerA()); ch.pipeline().addLast(new OutboundHandlerB()); ch.pipeline().addLast(new OutboundHandlerC()); } }); try { ChannelFuture future = strap.bind().sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } static class InboundHandlerA extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { throw new Exception("ERROR !!!" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class InboundHandlerC extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("InboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerA.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerB.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } static class OutboundHandlerC extends ChannelOutboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("OutboundHandlerC.exceptionCaught:" + cause.getMessage()); ctx.fireExceptionCaught(cause); } } }
源码
通过 telnet 来连接上面启动好的netty服务,并在控制台发送任意字符:
触发channel read事件并抛出异常,控制台输出如下信息:
1 2 3 4 5 InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!!
可以看到异常的捕获与我们添加的ChannelHandler顺序相同。
源码分析 在我们的示例中,InboundHandlerB的ChannelRead接口抛出异常,导致从InboundHandlerA将ChannelRead事件传播到InboundHandlerB的过程中出现异常,异常被捕获。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... @Override public ChannelHandlerContext fireExceptionCaught (final Throwable cause) { invokeExceptionCaught(next, cause); return this ; } static void invokeExceptionCaught (final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause" ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run () { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event." , t); logger.warn("The exceptionCaught() event that was failed to submit was:" , cause); } } } } ... private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } private void notifyHandlerException (Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event" , cause); } return ; } invokeExceptionCaught(cause); } private void invokeExceptionCaught (final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this , cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:" , ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:" , error, cause); } } } else { fireExceptionCaught(cause); } } ... }
最终会调用到TailContext节点的exceptionCaught接口,如果我们中途没有对异常进行拦截处理,做会打印出一段警告信息!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class DefaultChannelPipeline implements ChannelPipeline { ... final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ... @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } ... protected void onUnhandledInboundException (Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception." , cause); } finally { ReferenceCountUtil.release(cause); } } } ... }
在实际的应用中,一般会定一个ChannelHandler,放置Pipeline末尾,专门用来处理中途出现的各种异常。
最佳异常处理实践 单独定义ExceptionCaughtHandler来处理异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ... class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof Exception) { System.out.println("Successfully caught exception ! " ); } else { } } } ... ch.pipeline().addLast(new ExceptionCaughtHandler()); ...
输出:
1 2 3 4 5 6 InboundHandlerB.exceptionCaught:ERROR !!! InboundHandlerC.exceptionCaught:ERROR !!! OutboundHandlerA.exceptionCaught:ERROR !!! OutboundHandlerB.exceptionCaught:ERROR !!! OutboundHandlerC.exceptionCaught:ERROR !!! Successfully caught exception !
Pipeline回顾与总结 至此,我们对Pipeline的原理的解析就完成了。
Pipeline是在什么时候创建的? Pipeline添加与删除节点的逻辑是怎么样的? netty是如何判断ChannelHandler类型的? 如何处理ChannelHandler中抛出的异常? 对于ChannelHandler的添加应遵循什么样的顺序? 参考资料