前面的一些章节,我们分析了 Netty 的三大组件 —— Channel 、EventLoop 、Pipeline ,对 Netty 的工作原理有了深入的了解。在此基础上,我们来分析一下当 Netty 服务端启动后,Netty 是如何处理新连接接入的。
Netty 版本:4.1.30
本文内容主要分为以下四部分:
新连接检测
NioSocketChannel 创建
NioSocketChannel 初始化与注册
NioSocketChannel 注册 READ 兴趣集
新连接检测 前面,我们在讲 EventLoop 的启动过程源码分析 时,解读过下面这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final class NioEventLoop extends SingleThreadEventLoop { ... private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { ... try { ... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } ... } ... }
我们还是以服务端 NioServerSocketChannel 为例,它绑定的 unsafe 实例为 NioMessageUnsafe 。上面的 unsafe.read()
接口,会向下调用到 NioMessageUnsafe.read () 接口,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList <Object>(); @Override public void read () { assert eventLoop () .inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null ) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true ; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } ... }
对于 doReadMessages(...)
的分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class NioServerSocketChannel extends AbstractNioMessageChannel implements io .netty.channel.socket.ServerSocketChannel { ... @Override protected int doReadMessages (List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null ) { buf.add(new NioSocketChannel (this , ch)); return 1 ; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket." , t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket." , t2); } } return 0 ; } ... }
对于 continueReading()
接口的分析,至于结果为什么返回 false,后面会单独分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator { private volatile int maxMessagesPerRead; private volatile boolean respectMaybeMoreData = true ; ... public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; private int maxMessagePerRead; private int totalMessages; private int totalBytesRead; private int attemptedBytesRead; private int lastBytesRead; private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this .respectMaybeMoreData; private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier () { @Override public boolean get () { return attemptedBytesRead = = lastBytesRead; } }; ... @Override public boolean continueReading () { return continueReading(defaultMaybeMoreSupplier); } @Override public boolean continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0 ; } ... } ... }
NioSocketChannel 创建 上面分析新连接接入,提到了 NioSocketChannel 的创建,我们这里来详细分析一下,NioSocketChannel 的创建过程与此前我们分析 NioServerSocketChannel 创建 大体类似。
构造器 先来看看 NioSocketChannel 的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class NioSocketChannel extends AbstractNioByteChannel implements io .netty.channel.socket.SocketChannel { ... public NioSocketChannel (Channel parent, SocketChannel socket) { super (parent, socket); config = new NioSocketChannelConfig (this , socket.socket()); } ... }
父类 AbstractNioByteChannel 构造器:
1 2 3 4 5 6 7 8 9 10 11 12 public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... protected AbstractNioByteChannel (Channel parent, SelectableChannel ch) { super (parent, ch, SelectionKey.OP_READ); } ... }
父类 AbstractNioChannel 构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public abstract class AbstractNioChannel extends AbstractChannel { ... protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { ... } } }
父类 AbstractChannel 构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } ... }
ChannelConfig 创建 接着我们看看 NioSocketChannelConfig 的创建逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class NioSocketChannel extends AbstractNioByteChannel implements io .netty.channel.socket.SocketChannel { ... private NioSocketChannelConfig (NioSocketChannel channel, Socket javaSocket) { super (channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } ... }
父类 DefaultSocketChannelConfig 构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { ... public DefaultSocketChannelConfig (SocketChannel channel, Socket javaSocket) { super (channel); if (javaSocket == null ) { throw new NullPointerException ("javaSocket" ); } this .javaSocket = javaSocket; if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { setTcpNoDelay(true ); } catch (Exception e) { } } } ... }
NioSocketChannel 初始化与注册 上面小节分析了 NioSocketChannel 的创建逻辑,创建完成之后,我们来分析一下 NioSocketChannel 是如何注册到 NioEventLoop 上去的。
在前面小节分析新连接检测的有如下小段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private final class NioMessageUnsafe extends AbstractNioUnsafe { ... int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } ... }
调用 pipeline 传播 ChannelRead 事件,这里的 Pipeline 是服务端 Channel,也就是 NioServerSocketChannel 所绑定的 Pipeline,此时的 Pipeline 的内部结构是怎么样子的呢?
那这个 ServerBootstrapAcceptor 是从哪里来的呢?
在此前,我们分析 NioServerSocketChannel 初始化 时,有过下面这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class ServerBootstrap extends AbstractBootstrap <ServerBootstrap, ServerChannel> { ... void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } ... }
ServerBootstrapAcceptor NioServerSocketChannel 初始化时,向 NioServerSocketChannel 所绑定的 Pipeline 添加了一个 InboundHandler 节点 —— ServerBootstrapAcceptor ,其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public class ServerBootstrap extends AbstractBootstrap <ServerBootstrap, ServerChannel> { ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this .childGroup = childGroup; this .childHandler = childHandler; this .childOptions = childOptions; this .childAttrs = childAttrs; enableAutoReadTask = new Runnable () { @Override public void run () { channel.config().setAutoRead(true ); } }; } @Override @SuppressWarnings("unchecked") public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } ... } ... }
关于 ChannelInitializer 的讲解,可以看此前 Pipeline 源码分析 文章。
后面的 register 逻辑,就与我们前面讲解 NioServerSocketChannel 注册 大体类似了,这里简单介绍一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { ... @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0 (ChannelPromise promise) { try { ... boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } ... }
调用到 NioSocketChannel 中的 doRegister () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public abstract class AbstractNioChannel extends AbstractChannel { ... @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { ... } } } ... }
NioSocketChannel 注册 OP_READ 兴趣集 紧接着上面的分析,传播 ChannelActive 事件之后的逻辑,主要就是向客户端的 NioSocketChannel 注册一个 Read 兴趣集
1 2 3 4 5 6 7 8 9 if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } }
通过 Pipeline 的传播机制 ,最终会调用到 doBeginRead () 接口,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public abstract class AbstractNioChannel extends AbstractChannel { ... protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { ... @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } } ... } ... }
小结
Netty 是在哪里检测有新连接接入的?
新连接是怎样注册到 NioEventLoop 线程上的?
NioSocketChannel 是怎样初始化及注册的?
参考资料