前面,我们分析了 Netty EventLoop 的 创建 与 启动 原理,接下里我们来分析 Netty 中另外两个重要组件 —— ChannelHandler 与 **Pipeline **。Netty 中 I/O 事件的传播机制均由它负责,下面我们来看看它是如何实现的。
Netty 版本:4.1.30
我们前面在讲 Channel 创建 时,在 AbstractChannel 的构造函数中, 一笔带过地提到了 Pipeline,现在我们来深入分析一下它的原理。
概述 Netty channel lifecycle 前面,我们在分析 Netty channel 源码时,分析了 Channel 的创建、初始化、注册、绑定过程。在 Netty 中,channel 的生命周期如下所示:
ChannelRegistered:Channel 注册到了 EventLoop 上
ChannelActive:Channel 激活,连接到了远程某一个节点上,可以收发数据了
ChannelInactive:断开连接
ChannelUnregistered:Channel 从 EventLoop 上取消注册
Netty channelHandler Channel 每一次状态的变化,都会产生一个对应的事件,并且都会触发 ChannelHandler 中对应的方法进行处理,例如,当一个 Channel 成功注册到 EventLoop 上后,会有触发 channelHanndler 中的 handlerRegistered (…) 来处理该事件。
在上面的 ChannelHandler UML 中,最为重要的两个 ChannelHandler:
Netty ChannelPipeline 前面我们在分析 Channel 创建过程时,每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。ChannelPipeline 是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,如图所示:
一个 Channel 包含了一个 ChannelPipeline,ChannelPipeline 内部是一个双向的链表结构,内部由一个个的 ChannelHandlerContext 节点组成,ChannelPipeline 有头尾两个固定的节点 HeadContext 与 TailContext。用户自定的 ChannelHandler 就是由 ChannelHandlerContext 包装成 Pipeline 的节点,参与 Channel 整个生命周期中所触发的入站事件与出站事件以及相应数据流的拦截处理。
根据事件的起源,事件将会被 ChannelInboundHandler (入站处理器) 或者 ChannelOutboundHandler (出站处理器) 处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个 ChannelHandler,如图所示:
Pipeline UML 我们先来看下 ChannelPipeline 以及 ChannelHandlerContext 的类图结构,它们都实现了 ChannelInboundInvoker 与 ChannelOutboundInvoker 接口。
Pipeline 初始化 AbstractChannel 构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline () { return new DefaultChannelPipeline (this ); }
DefaultChannelPipeline 构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected DefaultChannelPipeline (Channel channel) { this .channel = ObjectUtil.checkNotNull(channel, "channel" ); succeededFuture = new SucceededChannelFuture (channel, null ); voidPromise = new VoidChannelPromise (channel, true ); tail = new TailContext (this ); head = new HeadContext (this ); head.next = tail; tail.prev = head; }
我们可以看到 Pipeline 其实是一个双向链表的结构,刚刚初始化的时候,Pipeline (管道) 中只有两个节点,如图:
接下来我们看看组成 Pipeline 节点的对象 —— ChannelHandlerContext。
ChannelHandlerContext ChannelHandlerContext 实现了 AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker 接口。Pipeline 中的事件传播,都是由 ChannelHandlerContext 负责,将发生的事件从一个节点传到下一个节点。
ChannelHandlerContext 接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public interface ChannelHandlerContext extends AttributeMap , ChannelInboundInvoker, ChannelOutboundInvoker { Channel channel () ; EventExecutor executor () ; String name () ; ChannelHandler handler () ; boolean isRemoved () ; @Override ChannelHandlerContext fireChannelRegistered () ; @Override ChannelHandlerContext fireChannelUnregistered () ; @Override ChannelHandlerContext fireChannelActive () ; @Override ChannelHandlerContext fireChannelInactive () ; @Override ChannelHandlerContext fireExceptionCaught (Throwable cause) ; @Override ChannelHandlerContext fireUserEventTriggered (Object evt) ; @Override ChannelHandlerContext fireChannelRead (Object msg) ; @Override ChannelHandlerContext fireChannelReadComplete () ; @Override ChannelHandlerContext fireChannelWritabilityChanged () ; @Override ChannelHandlerContext read () ; @Override ChannelHandlerContext flush () ; ChannelPipeline pipeline () ; ByteBufAllocator alloc () ; }
AttributeMap 接口 实现 AttributeMap 接口,表示 ChannelHandlerContext 节点可以存储自定义的属性。
1 2 3 4 5 6 7 public interface AttributeMap { <T> Attribute<T> attr (AttributeKey<T> key) ; <T> boolean hasAttr (AttributeKey<T> key) ; }
ChannelInboundInvoker 接口 实现 ChannelInboundInvoker 接口,表示节点可以用于传播入站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public interface ChannelInboundInvoker { ChannelInboundInvoker fireChannelRegistered () ; ChannelInboundInvoker fireChannelUnregistered () ; ChannelInboundInvoker fireChannelActive () ; ChannelInboundInvoker fireChannelInactive () ; ChannelInboundInvoker fireExceptionCaught (Throwable cause) ; ChannelInboundInvoker fireUserEventTriggered (Object event) ; ChannelInboundInvoker fireChannelRead (Object msg) ; ChannelInboundInvoker fireChannelReadComplete () ; ChannelInboundInvoker fireChannelWritabilityChanged () ; }
ChannelOutboundInvoker 接口 实现 ChannelOutboundInvoker 接口,意味着节点可以用来处理出站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public interface ChannelOutboundInvoker { ChannelFuture bind (SocketAddress localAddress) ; ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) ; ChannelFuture connect (SocketAddress remoteAddress) ; ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) ; ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress) ; ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ; ChannelFuture disconnect () ; ChannelFuture disconnect (ChannelPromise promise) ; ChannelFuture close () ; ChannelFuture close (ChannelPromise promise) ; ChannelFuture deregister () ; ChannelFuture deregister (ChannelPromise promise) ; ChannelOutboundInvoker read () ; ChannelFuture write (Object msg) ; ChannelFuture write (Object msg, ChannelPromise promise) ; ChannelOutboundInvoker flush () ; ChannelFuture writeAndFlush (Object msg, ChannelPromise promise) ; ChannelFuture writeAndFlush (Object msg) ; ChannelPromise newPromise () ; ChannelProgressivePromise newProgressivePromise () ; ChannelFuture newSucceededFuture () ; ChannelFuture newFailedFuture (Throwable cause) ; ChannelPromise voidPromise () ; }
TailContext & HeadContext 接下来,我们看看 Pipeline 中的头部与尾部节点。
TailContext 节点 TailContext 是尾部节点,inbound 类型,主要处理 Pipeline 中数据流的收尾工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super (pipeline, null , TAIL_NAME, true , false ); setAddComplete(); } @Override public ChannelHandler handler () { return this ; } ... @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } ... } protected void onUnhandledInboundException (Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception." , cause); } finally { ReferenceCountUtil.release(cause); } } protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } } final void setAddComplete () { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) { return ; } } }
AbstractChannelHandlerContext AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; private final boolean ordered; final EventExecutor executor; ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) { this .name = ObjectUtil.checkNotNull(name, "name" ); this .pipeline = pipeline; this .executor = executor; this .inbound = inbound; this .outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; } ... }
DefaultChannelHandlerContext 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super (pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null ) { throw new NullPointerException ("handler" ); } this .handler = handler; } @Override public ChannelHandler handler () { return handler; } private static boolean isInbound (ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound (ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
HeadContext HeadContext 是头部节点,outbound 类型,用于传播事件和进行一些底层 socket 操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super (pipeline, null , HEAD_NAME, false , true ); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler () { return this ; } @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect ( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read (ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush (ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered (ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); if (!channel.isOpen()) { destroy(); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
Pipeline 节点添加 上面我们分析了 Pipeline 的基本结构,接下来我们看看 Pipeline 添加节点(也就是 Handler 处理器)的过程。该过程主要分为三步:
判断是否重复添加
创建节点并添加至链表
回调添加完成事件
以这段常见的代码为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ServerBootstrap b = new ServerBootstrap ();b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (port)) .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();
我们从 ChannelPipeline.addLast() 方法进去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline addLast (ChannelHandler... handlers) { return addLast(null , handlers); } @Override public final ChannelPipeline addLast (EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null ) { throw new NullPointerException ("handlers" ); } for (ChannelHandler h: handlers) { if (h == null ) { break ; } addLast(executor, null , h); } return this ; } @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true ); return this ; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable () { @Override public void run () { callHandlerAdded0(newCtx); } }); return this ; } } callHandlerAdded0(newCtx); return this ; } private static void checkMultiplicity (ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException ( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times." ); } h.added = true ; } } private AbstractChannelHandlerContext newContext (EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext (this , childExecutor(group), name, handler); } private void addLast0 (AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } private void callHandlerAdded0 (final AbstractChannelHandlerContext ctx) { try { ctx.setAddComplete(); ctx.handler().handlerAdded(ctx); } catch (Throwable t) { ... remove0(ctx); ... } } ... }
我们来看下 setAddComplete () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... final void setAddComplete () { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) { return ; } } } ... final void setAddPending () { boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this , INIT, ADD_PENDING); assert updated; } ... }
回调用户自定义 Handler 中的 handlerAdded 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Sharable public class ServerHandler extends ChannelInboundHandlerAdapter { ... @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { System.out.printf("ServerHandler added ...." ); } ... }
ChannelInitializer 关于回调 ChannelHandler 中的 handlerAdded () 方法,最常见的一个场景就是,使用 ChannelInitializer 来添加我们自定义的 ChannelHandler。ChannelInitializer 被添加完成之后,会回调到它的 initChannel 方法。
接下来,我们看看 ChannelInitializer 这个类,它是一个特殊的 ChannelInboundHandler,它提供了一种在 Channel 注册到 EventLoop 后初始化 Channel 的简便方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Sharable public abstract class ChannelInitializer <C extends Channel > extends ChannelInboundHandlerAdapter { private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap(); protected abstract void initChannel (C ch) throws Exception; ... @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } } @SuppressWarnings("unchecked") private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null ) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true ; } return false ; } private void remove (ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } finally { initMap.remove(ctx); } } }
遍历 ChannelHandlerContext 节点查询出 ChannelHandler 实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelHandlerContext context (ChannelHandler handler) { if (handler == null ) { throw new NullPointerException ("handler" ); } AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null ) { return null ; } if (ctx.handler() == handler) { return ctx; } ctx = ctx.next; } } ... }
Pipeline 中除了 addLast 方法外, 还有 addFirst、addBefore、addAfter 等方法,逻辑类似,可以自行研究学习。
Pipeline 节点删除 上面,我们讲了 Pipeline 节点的添加,这小结我们看看 Pipeline 节点的删除功能。
netty 有个最大的特性之一就是 Handler 可插拔,做到动态编织 pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此 context 移除,下次 pipeline 在传播事件的时候就就不会调用到权限认证处理器。
下面是权限认证 Handler 最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此 Handler,否则,直接关闭连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class AuthHandler extends SimpleChannelInboundHandler <ByteBuf> { ... @Override protected void channelRead0 (ChannelHandlerContext ctx, ByteBuf data) throws Exception { if (verify(authDataPacket)) { ctx.pipeline().remove(this ); } else { ctx.close(); } } private boolean verify (ByteBuf byteBuf) { } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { System.out.println("AuthHandler has been removed ! " ); } }
我们来看看 DefaultChannelPipeline 中的 remove 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline remove (ChannelHandler handler) { remove(getContextOrDie(handler)); return this ; } ... private AbstractChannelHandlerContext getContextOrDie (ChannelHandler handler) { AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null ) { throw new NoSuchElementException (handler.getClass().getName()); } else { return ctx; } } ... private AbstractChannelHandlerContext remove (final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; synchronized (this ) { remove0(ctx); if (!registered) { callHandlerCallbackLater(ctx, false ); return ctx; } EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable () { @Override public void run () { callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; } ... private static void remove0 (AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } ... private void callHandlerRemoved0 (final AbstractChannelHandlerContext ctx) { try { try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } } catch (Throwable t) { fireExceptionCaught(new ChannelPipelineException ( ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception." , t)); } } ... }
好了, 删除的逻辑就分析到这里了。
小结 这一讲我们分析了 Pipeline 的创建过程,了解 Pipeline 中的链表结构以及每个节点的数据结构。还分析了 Pipeline 是如何添加节点的,又是如何删除节点的。接下来 ,我们会分析 Pipeline 如何进行事件传播的。
参考资料