前面,我们学习了 Netty 的基本 原理和架构 ,今天我们来大致了解一下 Netty 中的各个组件。
同我们 前面 学习 IO 与 NIO 一样的套路,我们先通过 echo 服务 demo 来学习 netty 的使用。
开发环境
编写 Echo Server 代码
Netty 服务端的开发主要有以下两个步骤:
- 至少有一个 ChannelHandler —— 这个主要用于处理从 client 端接受到的信息,是主要的业务逻辑处理类。
- Bootstrapping —— 用于配置服务的启动代码。最简单的就是,监听一个端口。
实现 EchoServerHandler 逻辑
服务端用于处理入站的网络请求,因此我们需要实现接口类 ChannelInboundHandler,它里面定义了用于
处理入站请求的一些接口。由于我们这个例子比较简单,只需要用到它的几个方法即可,因此我们的实现类只需要继承子类 ChannelInboundHandlerAdapter 即可,它默认实现了 ChannelInboundHandler 中的接口。
有几个方法需要留意一下:
- channelRead() —— 每当有入站请求来临时,该方法都会被调用
- channelReadComplete() —— 对 channelRead () 的最后一次调用是当前批处理中的最后一条消息时,该方法会被调用
- exceptionCaught() —— 在 read 操作执行期间,如果发生异常,该方法则会被调用。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
@Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
实现 EchoServer 逻辑
接下来,我们使用 ServerBootstrap 来实现服务端的开发,主要以下两点:
- 绑定一个监听端口
- 配置 Channels,当有入站消息到达时,通知 EchoServerHeadler 实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package nia.chapter2.echoserver;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
public class EchoServer { private final int port;
public EchoServer(int port) { this.port = port; }
public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>"); return; } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); }
public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind().sync(); System.out.println(EchoServer.class.getName() + " started and listening for connections on " + f.channel().localAddress()); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
|
编写 Echo Client 代码
Echo Client 代码逻辑:
- 连接服务器
- 发送一个或多个消息
- 等待服务端返回同样的消息
- 关闭连接
实现 EchoClientHandler 逻辑
同服务端一样,客户端也要实现 ChannelInboundHandler 接口,客户端需要继承 SimpleChannelInboundHandler ,有以下三个接口需要重写:
- channelActive() —— 当连接建立时,调用该方法
- channelRead0() —— 当接收到服务端的消息时,调用该方法
- exceptionCaught() —— 当有异常发生时,执行该方法
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package nia.chapter2.echoclient;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil;
@Sharable public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8)); }
@Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
|
实现 EchoClient 逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package nia.chapter2.echoclient;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient { private final String host; private final int port;
public EchoClient(String host, int port) { this.host = host; this.port = port; }
public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>" ); return; }
final String host = args[0]; final int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
|
为什么 client 使用 SimpleChannelInboundHandler ,而 server 端使用 ChannelInboundHandlerAdapter 的区别 ?
在 Client 中,channelRead0 () 完成时,消息已经处理完。当该方法返回时,SimpleChannelInboundHandler 会释放保存该消息的 ByteBuf 的内存引用。
在 Server 中,接收完消息后,还需要将消息回传给客户端,并且 wirte () 是异步的,当 channelRead () 完成时,消息内存还没有被释放。需要等到 channelReadComplete () 中调用 writeAndFlush () 才会被释放。
Netty 组件
这里我们先简要了解一下以下几个组件的作用,留个映像,后面我们会对每个组件做详细深入。
Channel
同我们前面学习 Java NIO Channel 类似,Netty Channel 在此基础上做了高度抽象的封装,主要用于网络 I/O 数据的基本操作,如 bind ()、connect ()、read ()、write () 等。
EventLoop
在网络连接的整个生命周期内,发生的所有事件的处理主要有 EventLoop 来处理
ChannelFuture
在 Netty 中,I/O 操作主要都是异步进行,当操作发生时,我们需要通过一种方式来知道操作在未来的时间点的执行结果。ChannelFutrue 中的 addListener () 方法,可以注册监听器 ChannelFutureListener,当操作完成时,监听器可以主动通知我们。
ChannelHandler
channelHandler 主要用于应用程序中的业务逻辑的处理,网络中的进入与出去的数据都经由它处理,当有事件发生时,channelHandler 会被触发执行。
ChannelPipeline
ChannelPipeline 提供了一种容器,用于定义数据流入与流出过程中的处理流程。可以将 Pipeline 看作是一条流水线,原始的原料 (字节流) 进来,经过加工,最后输出。
Bootstrapping
主要用于配置服务端或客户端的 Netty 程序的启动信息。
ByteBuf
字节数据容器,提供比 Java NIO ByteBuffer 更好的的 API。
参考资料