前面 ,我们大致了解了Netty中的几个核心组件。今天我们就来先来介绍Netty的网络通信组件,用于执行网络I/O操作 —— Channel 。
Netty版本:4.1.30
概述 数据在网络中总是以字节的形式进行流通。我们在进行网络编程时选用何种传输方式编码(OIO、NIO等)决定了这些字节的传输方式。
在没有Netty之前,为了提升系统的并发能力,从OIO切换到NIO时,需要对代码进行大量的重构,因为相应的Java NIO 与 IO API大不相同。而Netty在这些Java原生API的基础上做了一层封装,对用户提供了高度抽象而又统一的API,从而让传输方式的切换不在变得困难,只需要直接使用即可,而不需要对整个代码进行重构。
Netty Channel UML netty channel族如下:
整个族群中,AbstractChannel 是最为关键的一个抽象类,从它继承出了AbstractNioChannel、AbstractOioChannel、AbstractEpollChannel、LocalChannel、EmbeddedChannel等类,每个类代表了不同的协议以及相应的IO模型。除了 TCP 协议以外,Netty 还支持很多其他的连接协议,并且每种协议还有 NIO(异步 IO) 和 OIO(Old-IO,即传统的阻塞 IO) 版本的区别. 不同协议不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
NioSocketChannel:代表异步的客户端 TCP Socket 连接 NioServerSocketChannel:异步的服务器端 TCP Socket 连接 NioDatagramChannel:异步的 UDP 连接 NioSctpChannel:异步的客户端 Sctp 连接 NioSctpServerChannel:异步的 Sctp 服务器端连接 OioSocketChannel:同步的客户端 TCP Socket 连接 OioServerSocketChannel:同步的服务器端 TCP Socket 连接 OioDatagramChannel:同步的 UDP 连接 OioSctpChannel:同步的 Sctp 服务器端连接 OioSctpServerChannel:同步的客户端 TCP Socket 连接 Channel API 我们先来看下最顶层接口 channel 主要的API,常用的如下:
接口名 描述 eventLoop() Channel需要注册到EventLoop的多路复用器上,用于处理I/O事件,通过eventLoop()方法可以获取到Channel注册的EventLoop。EventLoop本质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。 pipeline() 返回channel分配的ChannelPipeline isActive() 判断channel是否激活。激活的意义取决于底层的传输类型。例如,一个Socket传输一旦连接到了远程节点便是活动的,而一个Datagram传输一旦被打开便是活动的 localAddress() 返回本地的socket地址 remoteAddress() 返回远程的socket地址 flush() 将之前已写的数据冲刷到底层Channel上去 write(Object msg) 请求将当前的msg通过ChannelPipeline写入到目标Channel中。注意,write操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush操作才会被写入到Channel中,发送给对方。 writeAndFlush() 等同于调用write()并接着调用flush() metadate() 熟悉TCP协议的读者可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TCP缓冲区大小,TCP的超时时间。是否重用地址等。在Netty中,每个Channel对应一个物理链接,每个连接都有自己的TCP参数配置。所以,Channel会聚合一个ChannelMetadata用来对TCP参数提供元数据描述信息,通过metadata()方法就可以获取当前Channel的TCP参数配置。 read() 从当前的Channel中读取数据到第一个inbound缓冲区中,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。读取操作API调用完成后,紧接着会触发ChannelHander.channelReadComplete(ChannelHandlerContext)事件,这样业务的ChannelHandler可以决定是否需要继续读取数据。如果已经有操作请求被挂起,则后续的读操作会被忽略。 close(ChannelPromise promise) 主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功,都可以通过ChannelPromise获取操作结果。该操作会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。 parent() 对于服务端Channel而言,它的父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel。 id() 返回ChannelId对象,ChannelId是Channel的唯一标识。
Channel创建 对Netty Channel API以及相关的类有了一个初步了解之后,接下来我们来详细了解一下在Netty的启动过程中Channel是如何创建的。服务端Channel的创建过程,主要分为四个步骤:1)Channel创建;2)Channel初始化;3)Channel注册;4)Channel绑定。
我们以下面的代码为例进行解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class ) // 配置监听地址 .localAddress (new InetSocketAddress (port )) // 设置服务器通道的选项,设置TCP 属性 .option (ChannelOption .SO_KEEPALIVE , Boolean .TRUE ) // 设置建立连接后的客户端通道的选项 .childOption (ChannelOption .CONNECT_TIMEOUT_MILLIS , 5000) // channel 属性,便于保存用户自定义数据 .attr(AttributeKey.newInstance("UserId"), "60293") .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync();
调用channel()接口设置 AbstractBootstrap 的成员变量 channelFactory ,该变量顾名思义就是用于创建channel的工厂类。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ... public B channel (Class<? extends C> channelClass) { if (channelClass == null ) { throw new NullPointerException("channelClass" ); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } ... public B channelFactory (ChannelFactory<? extends C> channelFactory) { if (channelFactory == null ) { throw new NullPointerException("channelFactory" ); } if (this .channelFactory != null ) { throw new IllegalStateException("channelFactory set already" ); } this .channelFactory = channelFactory; return (B) this ; } ...
channelFactory 设置为 ReflectiveChannelFactory ,在我们这个例子中 clazz 为 NioServerSocketChannel ,我们可以看到其中有个 newChannel() 接口,通过反射的方式来调用,这个接口的调用处我们后面会介绍到。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ReflectiveChannelFactory <T extends Channel > implements ChannelFactory <T > { private final Class<? extends T> clazz; public ReflectiveChannelFactory (Class<? extends T> clazz) { if (clazz == null ) { throw new NullPointerException("clazz" ); } this .clazz = clazz; } @Override public T newChannel () { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString () { return StringUtil.simpleClassName(clazz) + ".class" ; } }
接下来我们来看下 NioServerSocketChannel 的构造函数,主要就是:
生成ServerSocketChannel对象。NioServerSocketChannel创建时,首先使用SelectorProvider的openServerSocketChannel打开服务器套接字通道。SelectorProvider是Java的NIO提供的抽象类,是选择器和可选择通道的服务提供者。具体的实现类有SelectorProviderImpl,EPollSelectorProvide,PollSelectorProvider。选择器的主要工作是根据操作系统类型和版本选择合适的Provider:如果LInux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider。 设置 ServerSocketChannelConfig 成员变量。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private static ServerSocketChannel newSocket (SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a server socket." , e); } } private final ServerSocketChannelConfig config;public NioServerSocketChannel () { this (newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel (SelectorProvider provider) { this (newSocket(provider)); } public NioServerSocketChannel (ServerSocketChannel channel) { super (null , channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this , javaChannel().socket()); }
AbstractNioChannel 的构造器如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected AbstractNioChannel (Channel parent, SelectableChannel ch, int readInterestOp) { super (parent); this .ch = ch; this .readInterestOp = readInterestOp; try { ch.configureBlocking(false ); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket." , e2); } } throw new ChannelException("Failed to enter non-blocking mode." , e); } }
在 AbstractChannel 构造器中,会设Channel关联的三个核心对象:ChannelId、ChannelPipeline、Unsafe。
初始化ChannelId,ChannelId是一个全局唯一的值; 创建 NioMessageUnsafe 实例,该类为Channel提供了用于完成网络通讯相关的底层操作,如connect(),read(),register(),bind(),close()等; 为Channel创建DefaultChannelPipeline,初始事件传播管道。关于Pipeline的分析,请看 后文 的分析。 1 2 3 4 5 6 7 8 9 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
从 NioServerSocketChannelConfig 的构造函数追溯下去,在 DefaultChannelConfig 会设置channel成员变量。
1 2 3 4 5 6 7 8 9 public DefaultChannelConfig (Channel channel) { this (channel, new AdaptiveRecvByteBufAllocator()); } protected DefaultChannelConfig (Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this .channel = channel; }
以上就是channel创建的过程,总结一下:
通过 ReflectiveChannelFactory 工厂类,以反射的方式对channel进行创建; channel创建的过程中,会创建四个重要的对象:ChannelId、ChannelConfig、ChannelPipeline、Unsafe。 Channel初始化 主要分为以下两步:
将启动器(Bootstrap)设置的选项和属性设置到NettyChannel上面 向Pipeline添加初始化Handler,供注册后使用 我们从 AbstractBootstrap 的 bind() 接口进去,调用链:bind() —> doBind(localAddress) —> initAndRegister() —> init(Channel channel),我们看下 ServerBootstrap 中 init() 接口的实现:
1 2 3 4 5 6 7 8 9 10 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { .... }
初始化Channel,我们来重点看下 init(channel) 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings ("unchecked" ) AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
对于新连接接入器 ServerBootstrapAcceptor 的分析 ,请查看 后文
Channel注册 在channel完成创建和初始化之后,接下来就需要将其注册到事件轮循器Selector上去。我们回到 initAndRegister 接口上去:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final ChannelFuture initAndRegister () { ... ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
最终会向下调用到 SingleThreadEventLoop 中的 register 接口:
如何调用到这里,里面的细节需要等到后面文章讲到 MultithreadEventExecutorGroup 再详细说明
1 2 3 4 5 6 7 @Override public ChannelFuture register (final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise" ); promise.channel().unsafe().register(this , promise); return promise; }
代码跟踪下去,直到 AbstractChannel 中的 AbstractUnsafe 这个类中的 register 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @Override public final void register (EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null ) { throw new NullPointerException("eventLoop" ); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already" )); return ; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return ; } AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run () { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}" , AbstractChannel.this , t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
register0接口主要分为以下三段逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
我们来看 AbstractNioChannel 中的 doRegister()接口,最终调用的就是Java JDK底层的NIO API来注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doRegister () throws Exception { boolean selected = false ; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0 , this ); return ; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true ; } else { throw e; } } } }
Channel绑定 在完成创建、初始化以及注册之后,接下来就是Channel绑定操作。
本小节涉及到的pipeline事件传播机制,我们放到后面的文章中去讲解。
从启动器的bind()接口开始,往下调用 doBind() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { .... } }
doBind 方法又会调用 doBind0() 方法,在doBind0()方法中会通过EventLoop去执行channel的bind()任务,关于EventLoop的execute接口的分析,请看后面的 文章 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
doBind0() 方法往下会条用到 pipeline.bind(localAddress, promise);
方法,通过pipeline的传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.bind() 方法,这个方法主要做两件事情:
调用doBind():调用底层JDK API进行Channel的端口绑定。 调用pipeline.fireChannelActive(): 关于Pipeline的传播机制,请看 后文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { .... boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
我们这里看服务端 NioServerSocketChannel 实现的 doBind方法,最终会调用JDK 底层 NIO Channel的bind方法:
1 2 3 4 5 6 7 8 @Override protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
调用 pipeline.fireChannelActive(),开始传播active事件,pipeline首先就会调用HeadContext节点进行事件传播,会调用到 DefaultChannelPipeline.HeadContext.channelActive() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead () { if (channel.config().isAutoRead()) { channel.read(); } }
channel.read() 方法往下会调用到 AbstractChannelHandlerContext.read() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public ChannelHandlerContext read () { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null ) { next.invokeReadTask = task = new Runnable() { @Override public void run () { next.invokeRead(); } }; } executor.execute(task); } return this ; }
通过pipeline的事件传播机制,最终会调用到 AbstractChannel.AbstractUnsafe.beginRead() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public final void beginRead () { assertEventLoop(); if (!isActive()) { return ; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run () { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
我们看下 AbstractNioChannel 对doBeginRead接口的实现逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
总结 至此,我们就分析完了Channel的创建、初始化、注册、绑定的流程。其中涉及到的EventLoopGroup和Pipeline事件传播机制的知识点,我们放到后面的文章中去讲解。
参考资料