前面,我们学习了 Netty 的基本 原理和架构 ,今天我们来大致了解一下 Netty 中的各个组件。

同我们 前面 学习 IO 与 NIO 一样的套路,我们先通过 echo 服务 demo 来学习 netty 的使用。

Netty Echo Service

开发环境

编写 Echo Server 代码

Netty 服务端的开发主要有以下两个步骤:

  • 至少有一个 ChannelHandler —— 这个主要用于处理从 client 端接受到的信息,是主要的业务逻辑处理类。
  • Bootstrapping —— 用于配置服务的启动代码。最简单的就是,监听一个端口。

实现 EchoServerHandler 逻辑

服务端用于处理入站的网络请求,因此我们需要实现接口类 ChannelInboundHandler,它里面定义了用于

处理入站请求的一些接口。由于我们这个例子比较简单,只需要用到它的几个方法即可,因此我们的实现类只需要继承子类 ChannelInboundHandlerAdapter 即可,它默认实现了 ChannelInboundHandler 中的接口。

有几个方法需要留意一下:

  • channelRead() —— 每当有入站请求来临时,该方法都会被调用
  • channelReadComplete() —— 对 channelRead () 的最后一次调用是当前批处理中的最后一条消息时,该方法会被调用
  • exceptionCaught() —— 在 read 操作执行期间,如果发生异常,该方法则会被调用。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// @Sharable象征着该ChannelHandler实例在多个channels之间可以被安全地分享

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
// 打印消息日志
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
// 将入站消息发送给发送者,但不冲刷出站消息
ctx.write(in);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将待处理的消息冲刷到远程节点上,并关闭Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印堆栈信息
cause.printStackTrace();
// 关闭channel
ctx.close();
}
}

实现 EchoServer 逻辑

接下来,我们使用 ServerBootstrap 来实现服务端的开发,主要以下两点:

  • 绑定一个监听端口
  • 配置 Channels,当有入站消息到达时,通知 EchoServerHeadler 实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package nia.chapter2.echoserver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}

public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
// 创建 EventLoopGroup 实例
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建 ServerBootstrap 实例
ServerBootstrap b = new ServerBootstrap();
b.group(group)
// 执行Channel的类型为:NioServerSocketChannel
.channel(NioServerSocketChannel.class)
// 绑定端口
.localAddress(new InetSocketAddress(port))
// 将EchoServerHandler添加到ChannelPipeline中去
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// EchoServerHandler有注解 @Sharable,因此我们总是可以使用改实例
ch.pipeline().addLast(serverHandler);
}
});
// 异步绑定服务,sync() 用于等待绑定完成
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() +
" started and listening for connections on " + f.channel().localAddress());
// 获取Channel的CloseFutrue,在完成之前一直处于阻塞状态
f.channel().closeFuture().sync();
} finally {
// 关闭所有 EventLoopGroup,并释放所有资源
group.shutdownGracefully().sync();
}
}
}

编写 Echo Client 代码

Echo Client 代码逻辑:

  1. 连接服务器
  2. 发送一个或多个消息
  3. 等待服务端返回同样的消息
  4. 关闭连接

实现 EchoClientHandler 逻辑

同服务端一样,客户端也要实现 ChannelInboundHandler 接口,客户端需要继承 SimpleChannelInboundHandler ,有以下三个接口需要重写:

  • channelActive() —— 当连接建立时,调用该方法
  • channelRead0() —— 当接收到服务端的消息时,调用该方法
  • exceptionCaught() —— 当有异常发生时,执行该方法

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package nia.chapter2.echoclient;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

// @Sharable用于标记EchoClientHandler,可以在channel中分享使用
@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
public void channelActive(ChannelHandlerContext ctx) {
// 一旦连接建立,将会发送消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8));
}

@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
// 记录收到的消息
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印堆栈信息
cause.printStackTrace();
// 关闭channel
ctx.close();
}
}

实现 EchoClient 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package nia.chapter2.echoclient;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;


public class EchoClient {
private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start()
throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建Bootstrap
Bootstrap b = new Bootstrap();
// 指定使用 NioEventLoopGroup 去处理客户端事件
b.group(group)
// 指定channel类型为NIO
.channel(NioSocketChannel.class)
// 指定要连接的远程地址
.remoteAddress(new InetSocketAddress(host, port))
// 将 EchoClientHandler 添加到 pipeline中
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
// 连接远程地址,一直等待直到连接完成
ChannelFuture f = b.connect().sync();
// 在channel关闭前一直处于block状态
f.channel().closeFuture().sync();
} finally {
// 关闭线程池,释放所有资源
group.shutdownGracefully().sync();
}
}

public static void main(String[] args)
throws Exception {
if (args.length != 2) {
System.err.println("Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>"
);
return;
}

final String host = args[0];
final int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}

为什么 client 使用 SimpleChannelInboundHandler ,而 server 端使用 ChannelInboundHandlerAdapter 的区别 ?

在 Client 中,channelRead0 () 完成时,消息已经处理完。当该方法返回时,SimpleChannelInboundHandler 会释放保存该消息的 ByteBuf 的内存引用。

在 Server 中,接收完消息后,还需要将消息回传给客户端,并且 wirte () 是异步的,当 channelRead () 完成时,消息内存还没有被释放。需要等到 channelReadComplete () 中调用 writeAndFlush () 才会被释放。

Netty 组件

这里我们先简要了解一下以下几个组件的作用,留个映像,后面我们会对每个组件做详细深入。

Channel

同我们前面学习 Java NIO Channel 类似,Netty Channel 在此基础上做了高度抽象的封装,主要用于网络 I/O 数据的基本操作,如 bind ()、connect ()、read ()、write () 等。

EventLoop

在网络连接的整个生命周期内,发生的所有事件的处理主要有 EventLoop 来处理

ChannelFuture

在 Netty 中,I/O 操作主要都是异步进行,当操作发生时,我们需要通过一种方式来知道操作在未来的时间点的执行结果。ChannelFutrue 中的 addListener () 方法,可以注册监听器 ChannelFutureListener,当操作完成时,监听器可以主动通知我们。

ChannelHandler

channelHandler 主要用于应用程序中的业务逻辑的处理,网络中的进入与出去的数据都经由它处理,当有事件发生时,channelHandler 会被触发执行。

ChannelPipeline

ChannelPipeline 提供了一种容器,用于定义数据流入与流出过程中的处理流程。可以将 Pipeline 看作是一条流水线,原始的原料 (字节流) 进来,经过加工,最后输出。

Bootstrapping

主要用于配置服务端或客户端的 Netty 程序的启动信息。

ByteBuf

字节数据容器,提供比 Java NIO ByteBuffer 更好的的 API。

参考资料