前面 ,我们大致了解了 Netty 中的几个核心组件。今天我们就来先来介绍 Netty 的网络通信组件,用于执行网络 I/O 操作 —— Channel 。
Netty 版本:4.1.30
概述 数据在网络中总是以字节的形式进行流通。我们在进行网络编程时选用何种传输方式编码(OIO、NIO 等)决定了这些字节的传输方式。
在没有 Netty 之前,为了提升系统的并发能力,从 OIO 切换到 NIO 时,需要对代码进行大量的重构,因为相应的 Java NIO 与 IO API 大不相同。而 Netty 在这些 Java 原生 API 的基础上做了一层封装,对用户提供了高度抽象而又统一的 API,从而让传输方式的切换不在变得困难,只需要直接使用即可,而不需要对整个代码进行重构。
Netty Channel UML netty channel 族如下:
整个族群中,AbstractChannel 是最为关键的一个抽象类,从它继承出了 AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel、LocalChannel、EmbeddedChannel 等类,每个类代表了不同的协议以及相应的 IO 模型。除了 TCP 协议以外,Netty 还支持很多其他的连接协议,并且每种协议还有 NIO (异步 IO) 和 OIO (Old-IO,即传统的阻塞 IO) 版本的区别。不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel:代表异步的客户端 TCP Socket 连接
NioServerSocketChannel:异步的服务器端 TCP Socket 连接
NioDatagramChannel:异步的 UDP 连接
NioSctpChannel:异步的客户端 Sctp 连接
NioSctpServerChannel:异步的 Sctp 服务器端连接
OioSocketChannel:同步的客户端 TCP Socket 连接
OioServerSocketChannel:同步的服务器端 TCP Socket 连接
OioDatagramChannel:同步的 UDP 连接
OioSctpChannel:同步的 Sctp 服务器端连接
OioSctpServerChannel:同步的客户端 TCP Socket 连接
Channel API 我们先来看下最顶层接口 channel 主要的 API,常用的如下:
接口名
描述
eventLoop()
Channel 需要注册到 EventLoop 的多路复用器上,用于处理 I/O 事件,通过 eventLoop () 方法可以获取到 Channel 注册的 EventLoop。EventLoop 本质上就是处理网络读写事件的 Reactor 线程。在 Netty 中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义 NioTask 等任务。
pipeline()
返回 channel 分配的 ChannelPipeline
isActive()
判断 channel 是否激活。激活的意义取决于底层的传输类型。例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的
localAddress()
返回本地的 socket 地址
remoteAddress()
返回远程的 socket 地址
flush()
将之前已写的数据冲刷到底层 Channel 上去
write(Object msg)
请求将当前的 msg 通过 ChannelPipeline 写入到目标 Channel 中。注意,write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用 flush 操作才会被写入到 Channel 中,发送给对方。
writeAndFlush()
等同于调用 write () 并接着调用 flush ()
metadate()
熟悉 TCP 协议的读者可能知道,当创建 Socket 的时候需要指定 TCP 参数,例如接收和发送的 TCP 缓冲区大小,TCP 的超时时间。是否重用地址等。在 Netty 中,每个 Channel 对应一个物理链接,每个连接都有自己的 TCP 参数配置。所以,Channel 会聚合一个 ChannelMetadata 用来对 TCP 参数提供元数据描述信息,通过 metadata () 方法就可以获取当前 Channel 的 TCP 参数配置。
read()
从当前的 Channel 中读取数据到第一个 inbound 缓冲区中,如果数据被成功读取,触发 ChannelHandler.channelRead (ChannelHandlerContext,Object) 事件。读取操作 API 调用完成后,紧接着会触发 ChannelHander.channelReadComplete(ChannelHandlerContext)事件,这样业务的 ChannelHandler 可以决定是否需要继续读取数据。如果已经有操作请求被挂起,则后续的读操作会被忽略。
close(ChannelPromise promise)
主动关闭当前连接,通过 ChannelPromise 设置操作结果并进行结果通知,无论操作是否成功,都可以通过 ChannelPromise 获取操作结果。该操作会级联触发 ChannelPipeline 中所有 ChannelHandler 的 ChannelHandler.close (ChannelHandlerContext,ChannelPromise) 事件。
parent()
对于服务端 Channel 而言,它的父 Channel 为空;对于客户端 Channel,它的父 Channel 就是创建它的 ServerSocketChannel。
id()
返回 ChannelId 对象,ChannelId 是 Channel 的唯一标识。
Channel 创建 对 Netty Channel API 以及相关的类有了一个初步了解之后,接下来我们来详细了解一下在 Netty 的启动过程中 Channel 是如何创建的。服务端 Channel 的创建过程,主要分为四个步骤:1)Channel 创建;2)Channel 初始化;3)Channel 注册;4)Channel 绑定。
我们以下面的代码为例进行解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 EventLoopGroup bossGroup = new NioEventLoopGroup ();EventLoopGroup workGroup = new NioEventLoopGroup ();ServerBootstrap b = new ServerBootstrap ();b.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress (port)) .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000 ) .attr(AttributeKey.newInstance("UserId" ), "60293" ) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync();f.channel().closeFuture().sync();
调用 channel () 接口设置 AbstractBootstrap 的成员变量 channelFactory ,该变量顾名思义就是用于创建 channel 的工厂类。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 ... public B channel (Class<? extends C> channelClass) { if (channelClass == null ) { throw new NullPointerException ("channelClass" ); } return channelFactory(new ReflectiveChannelFactory <C>(channelClass)); } ... public B channelFactory (ChannelFactory<? extends C> channelFactory) { if (channelFactory == null ) { throw new NullPointerException ("channelFactory" ); } if (this .channelFactory != null ) { throw new IllegalStateException ("channelFactory set already" ); } this .channelFactory = channelFactory; return (B) this ; } ...
channelFactory 设置为 ReflectiveChannelFactory ,在我们这个例子中 clazz 为 NioServerSocketChannel ,我们可以看到其中有个 newChannel () 接口,通过反射的方式来调用,这个接口的调用处我们后面会介绍到。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T> { private final Class<? extends T > clazz; public ReflectiveChannelFactory (Class<? extends T> clazz) { if (clazz == null ) { throw new NullPointerException ("clazz" ); } this .clazz = clazz; } @Override public T newChannel () { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException ("Unable to create Channel from class " + clazz, t); } } @Override public String toString () { return StringUtil.simpleClassName(clazz) + ".class" ; } }
接下来我们来看下 NioServerSocketChannel 的构造函数,主要就是:
生成 ServerSocketChannel 对象。NioServerSocketChannel 创建时,首先使用 SelectorProvider 的 openServerSocketChannel 打开服务器套接字通道。SelectorProvider 是 Java 的 NIO 提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有 SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的 Provider:如果 LInux 内核版本 >=2.6 则,具体的 SelectorProvider 为 EPollSelectorProvider,否则为默认的 PollSelectorProvider。
设置 ServerSocketChannelConfig 成员变量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static ServerSocketChannel newSocket (SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException ("Failed to open a server socket." , e); } } private final ServerSocketChannelConfig config;public NioServerSocketChannel () { this (newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel (SelectorProvider provider) { this (newSocket(provider)); } public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig (this , javaChannel().socket()); }
AbstractNioChannel 的构造器如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket." , e2); } } throw new ChannelException ("Failed to enter non-blocking mode." , e); } }
在 AbstractChannel 构造器中,会设 Channel 关联的三个核心对象:ChannelId、ChannelPipeline、Unsafe。
初始化 ChannelId,ChannelId 是一个全局唯一的值;
创建 NioMessageUnsafe 实例,该类为 Channel 提供了用于完成网络通讯相关的底层操作,如 connect (),read (),register (),bind (),close () 等;
为 Channel 创建 DefaultChannelPipeline,初始事件传播管道。关于 Pipeline 的分析,请看 后文 的分析。
1 2 3 4 5 6 7 8 9 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
从 NioServerSocketChannelConfig 的构造函数追溯下去,在 DefaultChannelConfig 会设置 channel 成员变量。
1 2 3 4 5 6 7 8 9 public DefaultChannelConfig (Channel channel) { this (channel, new AdaptiveRecvByteBufAllocator ()); } protected DefaultChannelConfig (Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this .channel = channel; }
以上就是 channel 创建的过程,总结一下:
通过 ReflectiveChannelFactory 工厂类,以反射的方式对 channel 进行创建;
channel 创建的过程中,会创建四个重要的对象:ChannelId、ChannelConfig、ChannelPipeline、Unsafe。
Channel 初始化 主要分为以下两步:
将启动器(Bootstrap)设置的选项和属性设置到 NettyChannel 上面
向 Pipeline 添加初始化 Handler,供注册后使用
我们从 AbstractBootstrap 的 bind () 接口进去,调用链:bind () —> doBind (localAddress) —> initAndRegister () —> init (Channel channel),我们看下 ServerBootstrap 中 init () 接口的实现:
1 2 3 4 5 6 7 8 9 10 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { .... }
初始化 Channel,我们来重点看下 init (channel) 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
对于新连接接入器 ServerBootstrapAcceptor 的分析 ,请查看 后文
Channel 注册 在 channel 完成创建和初始化之后,接下来就需要将其注册到事件轮循器 Selector 上去。我们回到 initAndRegister 接口上去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final ChannelFuture initAndRegister () { ... ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
最终会向下调用到 SingleThreadEventLoop 中的 register 接口:
如何调用到这里,里面的细节需要等到后面文章讲到 MultithreadEventExecutorGroup 再详细说明
1 2 3 4 5 6 7 @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; }
代码跟踪下去,直到 AbstractChannel 中的 AbstractUnsafe 这个类中的 register 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null ) { throw new NullPointerException ("eventLoop" ); } if (isRegistered()) { promise.setFailure(new IllegalStateException ("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException ("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
register0 接口主要分为以下三段逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
我们来看 AbstractNioChannel 中的 doRegister () 接口,最终调用的就是 Java JDK 底层的 NIO API 来注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
Channel 绑定 在完成创建、初始化以及注册之后,接下来就是 Channel 绑定操作。
本小节涉及到的 pipeline 事件传播机制,我们放到后面的文章中去讲解。
从启动器的 bind () 接口开始,往下调用 doBind () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
doBind 方法又会调用 doBind0 () 方法,在 doBind0 () 方法中会通过 EventLoop 去执行 channel 的 bind () 任务,关于 EventLoop 的 execute 接口的分析,请看后面的 文章 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable () { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0 () 方法往下会条用到 pipeline.bind(localAddress, promise);
方法,通过 pipeline 的传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.bind () 方法,这个方法主要做两件事情:
调用 doBind ():调用底层 JDK API 进行 Channel 的端口绑定。
调用 pipeline.fireChannelActive ():
关于 Pipeline 的传播机制,请看 后文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { .... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
我们这里看服务端 NioServerSocketChannel 实现的 doBind 方法,最终会调用 JDK 底层 NIO Channel 的 bind 方法:
1 2 3 4 5 6 7 8 @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
调用 pipeline.fireChannelActive (),开始传播 active 事件,pipeline 首先就会调用 HeadContext 节点进行事件传播,会调用到 DefaultChannelPipeline.HeadContext.channelActive () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } }
channel.read () 方法往下会调用到 AbstractChannelHandlerContext.read () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public ChannelHandlerContext read () { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null ) { next.invokeReadTask = task = new Runnable () { @Override public void run () { next.invokeRead(); } }; } executor.execute(task); } return this ; }
通过 pipeline 的事件传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.beginRead () 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
我们看下 AbstractNioChannel 对 doBeginRead 接口的实现逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
总结 至此,我们就分析完了 Channel 的创建、初始化、注册、绑定的流程。其中涉及到的 EventLoopGroup 和 Pipeline 事件传播机制的知识点,我们放到后面的文章中去讲解。
参考资料