Netty入门

什么是Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

HelloWorld

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestNettyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup()) // 创建NioEventGroup 线程池 + selector
.channel(NioServerSocketChannel.class) //选择socket 的实现类
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 为socketchannel 添加处理器
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder()); // ByteBuf 解码为 String
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 业务处理器 处理上一个处理器的结果
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8080);
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestNettyClient {
public static void main(String[] args) throws Exception {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new StringEncoder());
}
})
.connect("localhost", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world");
}
}

概念理解

  • channel

​ 数据传输的通道

  • msg

    流动的数据,最开始接收是是bytebuf经过pipeline的handler处理加工成其他的对象,输出的时数据对象再编程bytebuf输出

  • handler

    处理数据的工具,可以将多个handler放入pipeline当中,pipeline负责发布事件(读、读取完成…)传递给每一个handler,handler重写自己感兴趣事件的方法进行处理

    handler分为Inbound和Outbound

  • eventLoop

    • 处理数据的工人,每个工人可以处理多个channel的io操作,给工人分配了一个channel后,该channel就一直由该工人处理,相当于工人和channel进行了绑定
    • 工人除了处理io操作,也可以对任务进行处理,每个工人都有自己的任务队列,队列中存放多个channel待处理的任务,任务可以是普通任务和定时任务
    • 工人会按照pipeline的顺序,依次按照handler的代码处理数据,可以为每道工序指定不同的工人

组件

1 EventLoop

事件循环对象

单线程执行器+selector,通过run方法处理Channel上源源不断的IO事件。

继承关系:

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

一组EventLoop,channel通过调用EventLoopGroup的register方法绑定其中一个EventLoop,之后这个channel上的所有io事件都由所绑定的这个eventloop来处理,这样可以保证io事件处理时的线程安全。

继承关系:

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

遍历的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestEventGroupIterate {
public static void main(String[] args) {
DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(2);
System.out.println(eventExecutors.next());
System.out.println(eventExecutors.next());
System.out.println(eventExecutors.next());
iterateEventGroup(eventExecutors);
}

private static void iterateEventGroup(DefaultEventLoopGroup group) {
for (EventExecutor eventLoop : group) {
System.out.println(eventLoop);
}
}
}

输出:

1
2
3
4
5
io.netty.channel.DefaultEventLoop@5eb5c224
io.netty.channel.DefaultEventLoop@53e25b76
io.netty.channel.DefaultEventLoop@5eb5c224
io.netty.channel.DefaultEventLoop@5eb5c224
io.netty.channel.DefaultEventLoop@53e25b76
优雅的关闭

通过shutdownGracefully可以对eventloopgroup进行优雅的关闭,先将eventloopgroup切换到关闭状态,禁止新的任务加入,再处理任务队列中剩余的任务后停止线程的运行,可以确保应用是在整体有序的情况下退出的

例子

NioEventLoop处理IO事件

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestEventLoopGroupIO {
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
ByteBuf byteBuf = msg instanceof ByteBuf ? (ByteBuf)msg : null;
if (byteBuf != null) {
byte[] bytes = new byte[21];
ByteBuf len = byteBuf.readBytes(bytes, 0, byteBuf.readableBytes());
log.debug(new String(bytes));
}
}
});
}
})
.bind(8080).sync();
}
}

客户端:

开启3个客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestEventLoopGroupIOClient {
public static void main(String[] args) throws Exception {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
System.out.println("init....");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();

channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu123456789019999".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
}
}

服务端输出:

1
2
3
4
5
6
21:13:32.551 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999
21:13:34.550 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu
21:13:34.960 [nioEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999
21:13:36.964 [nioEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIO - wangwu
21:13:37.096 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999
21:13:39.106 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu

从输出日志可以看到,不同客户端的channel分别由不同的EventLoop轮流进行处理。

使用非nioworker处理read事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class TestEventLoopGroupIOServer {
public static void main(String[] args) throws Exception {
DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2);// 创建两个defaultEventLoopGroup
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(normalWorkers, "myHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
ByteBuf byteBuf = msg instanceof ByteBuf ? (ByteBuf)msg : null;
if (byteBuf != null) {
byte[] bytes = new byte[21];
ByteBuf len = byteBuf.readBytes(bytes, 0, byteBuf.readableBytes());
log.debug(new String(bytes));
}
}
});
}
})
.bind(8080).sync();
}
}

多次开启客户端发送不同消息,server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75 |wangwu |
+--------+-------------------------------------------------+----------------+
21:57:38.938 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd, L:/127.0.0.1:8080 - R:/127.0.0.1:10065] READ COMPLETE
21:57:38.938 [defaultEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIOServer - wangwu
21:57:40.949 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd, L:/127.0.0.1:8080 - R:/127.0.0.1:10065] READ: 6B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 61 6e 67 77 75 |wangwu |
+--------+-------------------------------------------------+----------------+
21:57:40.950 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd, L:/127.0.0.1:8080 - R:/127.0.0.1:10065] READ COMPLETE
21:57:40.950 [defaultEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIOServer - wangwu
21:58:19.510 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] REGISTERED
21:58:19.510 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] ACTIVE
21:58:19.524 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 68 61 6e |zhangshan |
+--------+-------------------------------------------------+----------------+
21:58:19.524 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] READ COMPLETE
21:58:19.524 [defaultEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIOServer - zhangshan
21:58:21.531 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] READ: 9B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7a 68 61 6e 67 73 68 61 6e |zhangshan |
+--------+-------------------------------------------------+----------------+
21:58:21.532 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c, L:/127.0.0.1:8080 - R:/127.0.0.1:10096] READ COMPLETE
21:58:21.532 [defaultEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIOServer - zhangshan
21:58:36.086 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] REGISTERED
21:58:36.086 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] ACTIVE
21:58:36.100 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69 |lisi |
+--------+-------------------------------------------------+----------------+
21:58:36.100 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] READ COMPLETE
21:58:36.100 [defaultEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIOServer - lisi
21:58:38.102 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 6c 69 73 69 |lisi |
+--------+-------------------------------------------------+----------------+
21:58:38.103 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1, L:/127.0.0.1:8080 - R:/127.0.0.1:10114] READ COMPLETE
21:58:38.103 [defaultEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIOServer - lisi

nio 工人和非nio功能都绑定了channel来处理数据,从上面的日志可以看到,在使用pipeline中的handler处理msg的过程中线程从nioEventLoopGroup切换到了

defaultEventLoopGroup,这里在执行read事件的handler时会去拿处理该事件的eventLoop,然后判断该evenLoop的绑定的线程是否是当前线程,如果是的话就直接执行,不是的话将handler要执行的代码封装成一个任务交给处理该事件的eventLoop处理。

关键代码:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
NioEventLoop处理普通任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestEventLoopExecuteNormalTask {
public static void main(String[] args) throws Exception {
NioEventLoopGroup nioworkers = new NioEventLoopGroup(2);

log.debug("Server start...");
Thread.sleep(2000);

nioworkers.execute(()->{
log.debug("normal task...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

log.debug("main thread ended...");

}
}

输出:

1
2
3
22:48:07.609 [main] DEBUG netty.c2.TestEventLoopExecuteNormalTask - Server start...
22:48:09.617 [main] DEBUG netty.c2.TestEventLoopExecuteNormalTask - main thread ended...
22:48:09.618 [nioEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopExecuteNormalTask - normal task...
NioEventLoop处理定时任务
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class TestEventLoopExecuteCronTask {
public static void main(String[] args) {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(2);
log.debug("server start....");
eventExecutors.scheduleAtFixedRate(()->{
log.debug("running");
}, 0, 1, TimeUnit.SECONDS);
}
}

输出:

1
2
3
4
5
6
22:35:15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...
22:35:17 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:18 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:19 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
22:35:20 [DEBUG] [nioEventLoopGroup-2-1] c.i.o.EventLoopTest2 - running...
...

2 Channel

channel的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");

将之前写的客户端代码进行拆分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TestNettyClientSplited {
public static void main(String[] args) throws Exception {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
}).connect("localhost", 8080);
channelFuture.sync().channel().writeAndFlush("helloworld");
}
}

connect()建立连接的过程是异步的,返回channelFuture对象,channellFuture对象的sync方法可以等待连接建立完成。再通过channel()方法拿到正确的channel对象,最后通过channel对象的writeAndFlush方法发送数据。

下面可以打印sync前后channelFuture对象拿到的channel,看看不同:

1
2
23:23:00.894 [main] DEBUG netty.c1.TestNettyClientSplited - before [id: 0x55501c89]
23:23:00.907 [main] DEBUG netty.c1.TestNettyClientSplited - after [id: 0x55501c89, L:/127.0.0.1:4290 - R:localhost/127.0.0.1:8080]

注意:connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象

处了通过sync方法等待连接建立完成之外,也可以使用回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestNettyClientListener {
public static void main(String[] args) throws Exception {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
}).connect("localhost", 8080);

channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(future.channel());
}
});
}
}

输出:

1
[id: 0x655a38f2, L:/127.0.0.1:4974 - R:localhost/127.0.0.1:8080]

CloseFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class TestCloseChannelFuture {
public static void main(String[] args) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(1);
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));

Channel channel = channelFuture.sync().channel();

new Thread(()->{
Scanner scanner = new Scanner(System.in);
while(true) {
String s = scanner.nextLine();
if ("q".equals(s)) {
channel.close();
// ChannelFuture future = channel.closeFuture();
// try {
// future.sync();
// log.debug("处理channel已关闭,释放group资源");
// group.shutdownGracefully();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
break;
}
channel.writeAndFlush(s);
}
}, "intput").start();

channel.closeFuture().addListener((ChannelFutureListener) future -> {
log.debug("处理channel已关闭,释放group资源");
group.shutdownGracefully();
});
}
}

异步提升的点

使用多线程进行异步操作并不一定能提高任务的处理效率,多线程反而会使响应时间变长,只有合理的划分任务,各个线程配合工作才能提高任务的处理效率。

假设有四个医生每个医生工作8小时,20分钟看一个病人,这样4个医生每天最多看 8 4 3 = 96 个病人。

但是如果将看病这个任务划分成挂号、看病、缴费、取药这些子任务,每个任务5分钟,分配给各个医生单独处理就会达到不一样的效果。

一开始医生2、3、4分别要等5、10、15分钟才能执行工作

但当病人数量增加后四个医生就能满负荷工作,一天可以看 4 8 4 * 3 = 384 效率大概是之前的4倍

总结:

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键

3 Future & Promise

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

同步处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class TestSyncExecuteTask {
public static void main(String[] args) throws Exception {
DefaultEventLoop eventExecutor = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutor);

eventExecutor.execute(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("set success, {}", 10);
promise.setSuccess(10);
});

log.debug("start....");
log.debug("promise getNow {}", promise.getNow()); // promise get
log.debug("{}", promise.get());
}
}

输出:

1
2
3
4
11:38:09.620 [main] DEBUG netty.c3.TestSyncExecuteTask - start....
11:38:09.620 [main] DEBUG netty.c3.TestSyncExecuteTask - promise getNow null
11:38:11.626 [defaultEventLoop-1-1] DEBUG netty.c3.TestSyncExecuteTask - set success, 10
11:38:11.626 [main] DEBUG netty.c3.TestSyncExecuteTask - 10

异步处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
public class TestAsyncExecuteTask {
public static void main(String[] args) throws Exception {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

promise.addListener(future -> {
log.debug("feature getNow {}", future.getNow());
});


eventExecutors.execute(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("set success {}", 10);
promise.setSuccess(10);
});

}
}

输出:

1
2
11:45:24.818 [defaultEventLoop-1-1] DEBUG netty.c3.TestAsyncExecuteTask - set success 10
11:45:24.819 [defaultEventLoop-1-1] DEBUG netty.c3.TestAsyncExecuteTask - feature getNow 10

同步处理任务失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class TestSyncTaskFail {
public static void main(String[] args) {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RuntimeException runtimeException = new RuntimeException("error...");
promise.setFailure(runtimeException);
});

try {
// promise.sync();
// promise.get();
promise.await(); // await 不会抛出异常
} catch (Exception e) {
e.printStackTrace();
}


if (!promise.isSuccess()) {
log.debug("error: ",promise.cause());
}

}
}

异步处理任务失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class TestAsyncTaskFAil {
public static void main(String[] args) {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

promise.addListener(future -> {
log.debug("result: {}", future.isSuccess() ? future.getNow() : future.cause());
});

eventExecutors.execute(()->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
RuntimeException runtimeException = new RuntimeException("error...");
promise.setFailure(runtimeException);
});

}
}

await 死锁检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestAwaitDeadLock {
public static void main(String[] args) {
DefaultEventLoop eventExecutors = new DefaultEventLoop();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);

eventExecutors.execute(()->{
log.debug("1");
try {
promise.await();// 再event loop 线程当中调用阻塞操作会造成死锁 抛出BlockingOperationException
} catch (Exception e) {
log.debug("", e);
}
log.debug("2");
});

// eventExecutors.execute(()->{
// try {
// promise.await();
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
}
}

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13:10:54.357 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock - 1
13:10:54.358 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock -
io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6f58ba01(incomplete)
at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)
at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)
at netty.c3.TestAwaitDeadLock.lambda$main$0(TestAwaitDeadLock.java:16)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
13:10:54.359 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock - 2

4 handler & pipeline

channelHandler用来处理channel上的各种事件,分为入站和出站两种。pipeline 由一连串的handler组成。

  • 入站处理器用于读取客户端数据,写回结果。通常是 ChannelInboundHandlerAdapter 的子类。
  • 出站处理器用于对写回结果的加工。通常是 ChannelOutboundHandlerAdapter 的子类。

类比:

channel是一个产品的加工车间,pipeline是该车间的流水线,handler就是 流水线上的一道道工序,bytebuf就是原材料,先经过一道道入站工序,再经过一道道出站工序最后变成产品。

handler的处理顺序:

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class TestPipelineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(1);
ctx.fireChannelRead(msg);// 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(3);
ctx.channel().write(msg);// 3
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(4);
ctx.write(msg, promise);// 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(5);
ctx.write(msg, promise);// 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(6);
ctx.write(msg, promise);// 6
}
});
}
})
.bind(new InetSocketAddress("localhost", 8080));
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class TestPipelineClient {
public static void main(String[] args) throws Exception {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
channel.writeAndFlush("hello world!");
}
}

服务端输出:

1
2
3
4
5
6
1
2
3
6
5
4

可以看到入站处理器是按照addLast的顺序执行的,出站处理器是按照addLast的逆序执行的。ChannelPipline的实现是一个ChannelHandlerContext组成的双向链表

  • 入站处理器通过调用ctx.fireChannelRead来调用下一个入站处理器。
    • 注释掉 1 处的代码 就仅仅会打印 1
    • 注释掉2处的代码 仅仅打印 1、2
  • 3处的ctx.channel.write(msg)方法会从pipeline尾部寻找出战处理器执行
    • 注释掉3处的代码就只会打印1、2、3
  • 调用ctx.write方法会从当前处理器向上寻找前一个出站处理器

    • 注释6处的代码就只会打印1、2、3、6
  • ctx.channel().write(msg) vs ctx.write(msg)

    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

5 ByteBuf

ByteBuf是对字节数据的封装

1 创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TestByteBuf {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
log(buffer);
}

private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}

2 直接内存 vs 堆内存

创建基于堆的ByteBuf

1
ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();

创建基于直接内存的ByteBuf

1
ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

3 池化、非池化

  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

4 ByteBuf的 组成

一开始读写指针都在0位置

5 写入

方法列表,省略一些不重要的方法

方法签名 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一字节 01\ 00 代表 true\ false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是 Big Endian
  • 大端与小端 https://zhuanlan.zhihu.com/p/316347205

先写入 4 个字节

1
2
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

结果是

1
2
3
4
5
6
read index:0 write index:4 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+

再写入一个 int 整数,也是 4 个字节

1
2
buffer.writeInt(5);
log(buffer);

结果是

1
2
3
4
5
6
read index:0 write index:8 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

6 扩容

初始容量不够时就会对ByteBuf扩容,再向ByteBuf中写入一个int

1
2
buffer.writeInt(6);
log(buffer);
  • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
  • 扩容不能超过 max capacity 会报错
1
2
3
4
5
6
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 06 |............ |
+--------+-------------------------------------------------+----------------+

7 读取

例如读了 4 次,每次一个字节

1
2
3
4
5
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

1
2
3
4
5
6
7
8
9
10
1
2
3
4
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+

如果需要重复读取 int 整数 5,怎么办?

可以在 read 前先做个标记 mark

1
2
3
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);

结果

1
2
3
4
5
6
7
5
read index:8 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 06 |.... |
+--------+-------------------------------------------------+----------------+

这时要重复读取的话,重置到标记位置 reset

1
2
buffer.resetReaderIndex();
log(buffer);

这时

1
2
3
4
5
6
read index:4 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 00 00 00 06 |........ |
+--------+-------------------------------------------------+----------------+

还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index

8 retain & release

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

回收内存的源码实现,请关注下面方法的不同实现

protected abstract void deallocate()

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

谁来负责 release 呢?

不是我们想象的(一般情况下)

1
2
3
4
5
6
ByteBuf buf = ...
try {
...
} finally {
buf.release();
}

请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)

基本规则是,谁是最后使用者,谁负责 release,详细分析如下

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则
    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则
    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则
    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

TailContext 释放未处理消息逻辑

1
2
3
4
5
6
7
8
9
10
// io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}

具体代码

1
2
3
4
5
6
7
// io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}

9 slice

【零拷贝】对原始的ByteBuf进行切片,但是并没有发生复制,还是共用一块内存,切片后的ByteBuf维护独立的read、write指针

例,原始 ByteBuf 进行一些初始操作

1
2
3
4
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+

这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write

1
2
3
ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+

如果原始 ByteBuf 再次读操作(又读了一个字节)

1
2
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 04 |.. |
+--------+-------------------------------------------------+----------------+

这时的 slice 不受影响,因为它有独立的读写指针

1
System.out.println(ByteBufUtil.prettyHexDump(slice));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04 |... |
+--------+-------------------------------------------------+----------------+

如果 slice 的内容发生了更改

1
2
slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05 |... |
+--------+-------------------------------------------------+----------------+

这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存

1
System.out.println(ByteBufUtil.prettyHexDump(origin));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05 |.. |
+--------+-------------------------------------------------+----------------+

10 duplicate

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的

11 copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

12 CompositeByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝

有两个 ByteBuf 如下

1
2
3
4
5
6
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));

输出

1
2
3
4
5
6
7
8
9
10
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 |..... |
+--------+-------------------------------------------------+----------------+
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a |..... |
+--------+-------------------------------------------------+----------------+

现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?

方法1:

1
2
3
4
5
ByteBuf buf3 = ByteBufAllocator.DEFAULT
.buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

结果

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+

这种方法好不好?回答是不太好,因为进行了数据的内存复制操作

方法2:

1
2
3
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);

结果是一样的

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+

CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

1
2
3
4
5
6
7
8
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

输出

1
2
3
4
5
         +-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+

也可以用来包装普通字节数组,底层也不会有拷贝操作

1
2
3
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));

输出

1
2
3
4
5
6
class io.netty.buffer.CompositeByteBuf
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 |...... |
+--------+-------------------------------------------------+----------------+

ByteBuf 优势

  • 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf

EchoServer

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class EchoServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup(2))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = msg instanceof ByteBuf ? (ByteBuf) msg : null;
if (buffer != null) {
System.out.println(Charset.defaultCharset().decode(buffer.nioBuffer()).toString());
ByteBuf response = ByteBufAllocator.DEFAULT.buffer();
response.writeBytes(buffer);
ctx.writeAndFlush(buffer);
// 思考需要释放buffer吗
// 需要释放buffer应为 没有继续向下传递交给尾部处理器处理
buffer.release();
// 思考需要释放responese吗
// 不用释放responese 英文 ctx.writeAndFlush 源码当中最后会交给头部处理器处理
}
}
});
}
}).channel(NioServerSocketChannel.class)
.bind(8080);
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class EchoClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
buffer.release();
// 思考:需要释放 buffer 吗
// buffer需要自己释放,因为没有继续向下传递给下一个入站处理器
// 当然也可以调用super.channelRead(ctx,stu);传递给pipeline的尾部的处理器中进行处理
}
});
}
}).connect("127.0.0.1", 8080).sync().channel();

channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});

new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
}
}