前面,我们分析了Netty EventLoop的 创建 与 启动 原理,接下里我们来分析Netty中另外两个重要组件—— ChannelHandler 与 Pipeline 。Netty中I/O事件的传播机制均由它负责,下面我们来看看它是如何实现的。
Netty版本:4.1.30
我们前面在讲 Channel创建 时,在AbstractChannel的构造函数中, 一笔带过地提到了Pipeline,现在我们来深入分析一下它的原理。
概述 Netty channel lifecycle 前面,我们在分析 Netty channel 源码时,分析了Channel的创建、初始化、注册、绑定过程。在Netty中,channel的生命周期如下所示:
ChannelRegistered:Channel注册到了EventLoop上 ChannelActive:Channel激活,连接到了远程某一个节点上,可以收发数据了 ChannelInactive:断开连接 ChannelUnregistered:Channel从EventLoop上取消注册 Netty channelHandler Channel 每一次状态的变化,都会产生一个对应的事件,并且都会触发 ChannelHandler 中对应的方法进行处理,例如,当一个Channel成功注册到EventLoop上后,会有触发channelHanndler中的handlerRegistered(…) 来处理该事件。
在上面的 ChannelHandler UML中,最为重要的两个ChannelHandler:
Netty ChannelPipeline 前面我们在分析Channel创建过程时,每一个新创建的Channel都将会被分配一个新的ChannelPipeline。ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,如图所示:
一个 Channel 包含了一个 ChannelPipeline,ChannelPipeline内部是一个双向的链表结构,内部由一个个的ChannelHandlerContext节点组成,ChannelPipeline有头尾两个固定的节点HeadContext与TailContext。用户自定的ChannelHandler就是由ChannelHandlerContext包装成Pipeline的节点,参与Channel整个生命周期中所触发的入站事件与出站事件以及相应数据流的拦截处理。
根据事件的起源,事件将会被ChannelInboundHandler(入站处理器)或者ChannelOutboundHandler(出站处理器)处理。随后,通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler,如图所示:
Pipeline UML 我们先来看下 ChannelPipeline 以及 ChannelHandlerContext 的类图结构,它们都实现了ChannelInboundInvoker 与ChannelOutboundInvoker 接口。
Pipeline初始化 AbstractChannel构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } protected DefaultChannelPipeline newChannelPipeline () { return new DefaultChannelPipeline(this ); }
DefaultChannelPipeline 构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected DefaultChannelPipeline (Channel channel) { this .channel = ObjectUtil.checkNotNull(channel, "channel" ); succeededFuture = new SucceededChannelFuture(channel, null ); voidPromise = new VoidChannelPromise(channel, true ); tail = new TailContext(this ); head = new HeadContext(this ); head.next = tail; tail.prev = head; }
我们可以看到Pipeline其实是一个双向链表的结构,刚刚初始化的时候,Pipeline(管道)中只有两个节点,如图:
接下来我们看看组成Pipeline节点的对象—— ChannelHandlerContext。
ChannelHandlerContext ChannelHandlerContext 实现了AttributeMap、ChannelInboundInvoker、ChannelOutboundInvoker接口。Pipeline中的事件传播,都是由ChannelHandlerContext负责,将发生的事件从一个节点传到下一个节点。
ChannelHandlerContext接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public interface ChannelHandlerContext extends AttributeMap , ChannelInboundInvoker , ChannelOutboundInvoker { Channel channel () ; EventExecutor executor () ; String name () ; ChannelHandler handler () ; boolean isRemoved () ; @Override ChannelHandlerContext fireChannelRegistered () ; @Override ChannelHandlerContext fireChannelUnregistered () ; @Override ChannelHandlerContext fireChannelActive () ; @Override ChannelHandlerContext fireChannelInactive () ; @Override ChannelHandlerContext fireExceptionCaught (Throwable cause) ; @Override ChannelHandlerContext fireUserEventTriggered (Object evt) ; @Override ChannelHandlerContext fireChannelRead (Object msg) ; @Override ChannelHandlerContext fireChannelReadComplete () ; @Override ChannelHandlerContext fireChannelWritabilityChanged () ; @Override ChannelHandlerContext read () ; @Override ChannelHandlerContext flush () ; ChannelPipeline pipeline () ; ByteBufAllocator alloc () ; }
AttributeMap接口 实现 AttributeMap 接口,表示ChannelHandlerContext节点可以存储自定义的属性。
1 2 3 4 5 6 7 public interface AttributeMap { <T> Attribute<T> attr (AttributeKey<T> key) ; <T> boolean hasAttr (AttributeKey<T> key) ; }
ChannelInboundInvoker接口 实现ChannelInboundInvoker接口,表示节点可以用于传播入站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public interface ChannelInboundInvoker { ChannelInboundInvoker fireChannelRegistered () ; ChannelInboundInvoker fireChannelUnregistered () ; ChannelInboundInvoker fireChannelActive () ; ChannelInboundInvoker fireChannelInactive () ; ChannelInboundInvoker fireExceptionCaught (Throwable cause) ; ChannelInboundInvoker fireUserEventTriggered (Object event) ; ChannelInboundInvoker fireChannelRead (Object msg) ; ChannelInboundInvoker fireChannelReadComplete () ; ChannelInboundInvoker fireChannelWritabilityChanged () ; }
ChannelOutboundInvoker接口 实现ChannelOutboundInvoker接口,意味着节点可以用来处理出站相关的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public interface ChannelOutboundInvoker { ChannelFuture bind (SocketAddress localAddress) ; ChannelFuture bind (SocketAddress localAddress, ChannelPromise promise) ; ChannelFuture connect (SocketAddress remoteAddress) ; ChannelFuture connect (SocketAddress remoteAddress, ChannelPromise promise) ; ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress) ; ChannelFuture connect (SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ; ChannelFuture disconnect () ; ChannelFuture disconnect (ChannelPromise promise) ; ChannelFuture close () ; ChannelFuture close (ChannelPromise promise) ; ChannelFuture deregister () ; ChannelFuture deregister (ChannelPromise promise) ; ChannelOutboundInvoker read () ; ChannelFuture write (Object msg) ; ChannelFuture write (Object msg, ChannelPromise promise) ; ChannelOutboundInvoker flush () ; ChannelFuture writeAndFlush (Object msg, ChannelPromise promise) ; ChannelFuture writeAndFlush (Object msg) ; ChannelPromise newPromise () ; ChannelProgressivePromise newProgressivePromise () ; ChannelFuture newSucceededFuture () ; ChannelFuture newFailedFuture (Throwable cause) ; ChannelPromise voidPromise () ; }
TailContext & HeadContext 接下来,我们看看Pipeline中的头部与尾部节点。
TailContext节点 TailContext是尾部节点,inbound类型,主要处理Pipeline中数据流的收尾工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super (pipeline, null , TAIL_NAME, true , false ); setAddComplete(); } @Override public ChannelHandler handler () { return this ; } ... @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); } ... } protected void onUnhandledInboundException (Throwable cause) { try { logger.warn( "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception." , cause); } finally { ReferenceCountUtil.release(cause); } } protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } } final void setAddComplete () { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) { return ; } } }
AbstractChannelHandlerContext AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; private final boolean ordered; final EventExecutor executor; ... AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) { this .name = ObjectUtil.checkNotNull(name, "name" ); this .pipeline = pipeline; this .executor = executor; this .inbound = inbound; this .outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; } ... }
DefaultChannelHandlerContext 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super (pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null ) { throw new NullPointerException("handler" ); } this .handler = handler; } @Override public ChannelHandler handler () { return handler; } private static boolean isInbound (ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound (ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
HeadContext HeadContext是头部节点,outbound类型,用于传播事件和进行一些底层socket操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super (pipeline, null , HEAD_NAME, false , true ); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler () { return this ; } @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect ( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.disconnect(promise); } @Override public void close (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.close(promise); } @Override public void deregister (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { unsafe.deregister(promise); } @Override public void read (ChannelHandlerContext ctx) { unsafe.beginRead(); } @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush (ChannelHandlerContext ctx) throws Exception { unsafe.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered (ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); if (!channel.isOpen()) { destroy(); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } }
Pipeline 节点添加 上面我们分析了Pipeline的基本结构,接下来我们看看Pipeline添加节点(也就是Handler处理器)的过程。该过程主要分为三步:
判断是否重复添加 创建节点并添加至链表 回调添加完成事件 以这段常见的代码为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class ) .localAddress (new InetSocketAddress (port )) .option (ChannelOption .SO_KEEPALIVE , Boolean .TRUE ) .childOption (ChannelOption .CONNECT_TIMEOUT_MILLIS , 5000) .handler (new LoggingHandler (LogLevel .INFO )) .childHandler (new ChannelInitializer <SocketChannel >() { @Override public void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();
我们从 ChannelPipeline.addLast() 方法进去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline addLast (ChannelHandler... handlers) { return addLast(null , handlers); } @Override public final ChannelPipeline addLast (EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null ) { throw new NullPointerException("handlers" ); } for (ChannelHandler h: handlers) { if (h == null ) { break ; } addLast(executor, null , h); } return this ; } @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true ); return this ; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run () { callHandlerAdded0(newCtx); } }); return this ; } } callHandlerAdded0(newCtx); return this ; } private static void checkMultiplicity (ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times." ); } h.added = true ; } } private AbstractChannelHandlerContext newContext (EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this , childExecutor(group), name, handler); } private void addLast0 (AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; } private void callHandlerAdded0 (final AbstractChannelHandlerContext ctx) { try { ctx.setAddComplete(); ctx.handler().handlerAdded(ctx); } catch (Throwable t) { ... remove0(ctx); ... } } ... }
我们来看下setAddComplete()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext , ResourceLeakHint { ... final void setAddComplete () { for (;;) { int oldState = handlerState; if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this , oldState, ADD_COMPLETE)) { return ; } } } ... final void setAddPending () { boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this , INIT, ADD_PENDING); assert updated; } ... }
回调用户自定义Handler中的handlerAdded方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Sharable public class ServerHandler extends ChannelInboundHandlerAdapter { ... @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { System.out.printf("ServerHandler added ...." ); } ... }
ChannelInitializer 关于回调ChannelHandler中的handlerAdded()方法,最常见的一个场景就是,使用 ChannelInitializer 来添加我们自定义的ChannelHandler。ChannelInitializer被添加完成之后,会回调到它的 initChannel 方法。
接下来,我们看看 ChannelInitializer 这个类,它是一个特殊的ChannelInboundHandler,它提供了一种在Channel注册到EventLoop后初始化Channel的简便方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Sharable public abstract class ChannelInitializer <C extends Channel > extends ChannelInboundHandlerAdapter { private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap(); protected abstract void initChannel (C ch) throws Exception ; ... @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } } @SuppressWarnings ("unchecked" ) private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null ) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true ; } return false ; } private void remove (ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } finally { initMap.remove(ctx); } } }
遍历 ChannelHandlerContext 节点查询出ChannelHandler实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelHandlerContext context (ChannelHandler handler) { if (handler == null ) { throw new NullPointerException("handler" ); } AbstractChannelHandlerContext ctx = head.next; for (;;) { if (ctx == null ) { return null ; } if (ctx.handler() == handler) { return ctx; } ctx = ctx.next; } } ... }
Pipeline中除了addLast方法外, 还有addFirst、addBefore、addAfter等方法,逻辑类似,可以自行研究学习。
Pipeline 节点删除 上面,我们讲了Pipeline节点的添加,这小结我们看看Pipeline节点的删除功能。
netty 有个最大的特性之一就是Handler可插拔,做到动态编织pipeline,比如在首次建立连接的时候,需要通过进行权限认证,在认证通过之后,就可以将此context移除,下次pipeline在传播事件的时候就就不会调用到权限认证处理器。
下面是权限认证Handler最简单的实现,第一个数据包传来的是认证信息,如果校验通过,就删除此Handler,否则,直接关闭连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class AuthHandler extends SimpleChannelInboundHandler <ByteBuf > { ... @Override protected void channelRead0 (ChannelHandlerContext ctx, ByteBuf data) throws Exception { if (verify(authDataPacket)) { ctx.pipeline().remove(this ); } else { ctx.close(); } } private boolean verify (ByteBuf byteBuf) { } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { System.out.println("AuthHandler has been removed ! " ); } }
我们来看看 DefaultChannelPipeline 中的 remove 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class DefaultChannelPipeline implements ChannelPipeline { ... @Override public final ChannelPipeline remove (ChannelHandler handler) { remove(getContextOrDie(handler)); return this ; } ... private AbstractChannelHandlerContext getContextOrDie (ChannelHandler handler) { AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler); if (ctx == null ) { throw new NoSuchElementException(handler.getClass().getName()); } else { return ctx; } } ... private AbstractChannelHandlerContext remove (final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; synchronized (this ) { remove0(ctx); if (!registered) { callHandlerCallbackLater(ctx, false ); return ctx; } EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable() { @Override public void run () { callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; } ... private static void remove0 (AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; } ... private void callHandlerRemoved0 (final AbstractChannelHandlerContext ctx) { try { try { ctx.handler().handlerRemoved(ctx); } finally { ctx.setRemoved(); } } catch (Throwable t) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception." , t)); } } ... }
好了, 删除的逻辑就分析到这里了。
小结 这一讲我们分析了Pipeline的创建过程,了解Pipeline中的链表结构以及每个节点的数据结构。还分析了Pipeline是如何添加节点的,又是如何删除节点的。接下来 ,我们会分析Pipeline如何进行事件传播的。
参考资料