Netty入门 什么是Netty
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
HelloWorld 服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class TestNettyServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new SimpleChannelInboundHandler <String>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }); } }) .bind(8080 ); } }
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class TestNettyClient { public static void main(String[] args) throws Exception { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringEncoder()); } }) .connect("localhost", 8080) .sync() .channel() .writeAndFlush(new Date() + ": hello world"); } }
概念理解
数据传输的通道
msg
流动的数据,最开始接收是是bytebuf经过pipeline的handler处理加工成其他的对象,输出的时数据对象再编程bytebuf输出
handler
处理数据的工具,可以将多个handler放入pipeline当中,pipeline负责发布事件(读、读取完成…)传递给每一个handler,handler重写自己感兴趣事件的方法进行处理
handler分为Inbound和Outbound
eventLoop
处理数据的工人,每个工人可以处理多个channel的io操作,给工人分配了一个channel后,该channel就一直由该工人处理,相当于工人和channel进行了绑定
工人除了处理io操作,也可以对任务进行处理,每个工人都有自己的任务队列,队列中存放多个channel待处理的任务,任务可以是普通任务和定时任务
工人会按照pipeline的顺序,依次按照handler的代码处理数据,可以为每道工序指定不同的工人
组件 1 EventLoop 事件循环对象 单线程执行器+selector,通过run方法处理Channel上源源不断的IO事件。
继承关系:
一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
另一条线是继承自 netty 自己的 OrderedEventExecutor,
提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组 一组EventLoop,channel通过调用EventLoopGroup的register方法绑定其中一个EventLoop,之后这个channel上的所有io事件都由所绑定的这个eventloop来处理,这样可以保证io事件处理时的线程安全。
继承关系:
继承自 netty 自己的 EventExecutorGroup
实现了 Iterable 接口提供遍历 EventLoop 的能力
另有 next 方法获取集合中下一个 EventLoop
遍历的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class TestEventGroupIterate { public static void main (String[] args) { DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup (2 ); System.out.println(eventExecutors.next()); System.out.println(eventExecutors.next()); System.out.println(eventExecutors.next()); iterateEventGroup(eventExecutors); } private static void iterateEventGroup (DefaultEventLoopGroup group) { for (EventExecutor eventLoop : group) { System.out.println(eventLoop); } } }
输出:
1 2 3 4 5 io.netty.channel.DefaultEventLoop@5eb5c224 io.netty.channel.DefaultEventLoop@53e25b76 io.netty.channel.DefaultEventLoop@5eb5c224 io.netty.channel.DefaultEventLoop@5eb5c224 io.netty.channel.DefaultEventLoop@53e25b76
优雅的关闭 通过shutdownGracefully可以对eventloopgroup进行优雅的关闭,先将eventloopgroup切换到关闭状态,禁止新的任务加入,再处理任务队列中剩余的任务后停止线程的运行,可以确保应用是在整体有序的情况下退出的
例子 NioEventLoop处理IO事件 服务端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class TestEventLoopGroupIO { public static void main (String[] args) throws Exception { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { ByteBuf byteBuf = msg instanceof ByteBuf ? (ByteBuf)msg : null ; if (byteBuf != null ) { byte [] bytes = new byte [21 ]; ByteBuf len = byteBuf.readBytes(bytes, 0 , byteBuf.readableBytes()); log.debug(new String (bytes)); } } }); } }) .bind(8080 ).sync(); } }
客户端:
开启3个客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class TestEventLoopGroupIOClient { public static void main (String[] args) throws Exception { Channel channel = new Bootstrap () .group(new NioEventLoopGroup (1 )) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { System.out.println("init...." ); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); } }) .channel(NioSocketChannel.class).connect("localhost" , 8080 ) .sync() .channel(); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu123456789019999" .getBytes())); Thread.sleep(2000 ); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu" .getBytes())); } }
服务端输出:
1 2 3 4 5 6 21:13:32.551 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999 21:13:34.550 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu 21:13:34.960 [nioEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999 21:13:36.964 [nioEventLoopGroup-2-1] DEBUG netty.c2.TestEventLoopGroupIO - wangwu 21:13:37.096 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu123456789019999 21:13:39.106 [nioEventLoopGroup-2-2] DEBUG netty.c2.TestEventLoopGroupIO - wangwu
从输出日志可以看到,不同客户端的channel分别由不同的EventLoop轮流进行处理。
使用非nioworker处理read事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Slf4j public class TestEventLoopGroupIOServer { public static void main (String[] args) throws Exception { DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup (2 ); new ServerBootstrap () .group(new NioEventLoopGroup (2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); nioSocketChannel.pipeline().addLast(normalWorkers, "myHandler" , new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { ByteBuf byteBuf = msg instanceof ByteBuf ? (ByteBuf)msg : null ; if (byteBuf != null ) { byte [] bytes = new byte [21 ]; ByteBuf len = byteBuf.readBytes(bytes, 0 , byteBuf.readableBytes()); log.debug(new String (bytes)); } } }); } }) .bind(8080 ).sync(); } }
多次开启客户端发送不同消息,server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 21 :57 :38.938 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10065 ] READ COMPLETE21 :57 :38.938 [defaultEventLoopGroup-2 -1 ] DEBUG netty.c2.TestEventLoopGroupIOServer - wangwu 21 :57 :40.949 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10065 ] READ: 6B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 77 61 6e 67 77 75 |wangwu | +--------+-------------------------------------------------+----------------+ 21 :57 :40.950 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xafe832bd , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10065 ] READ COMPLETE21 :57 :40.950 [defaultEventLoopGroup-2 -1 ] DEBUG netty.c2.TestEventLoopGroupIOServer - wangwu 21 :58 :19.510 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] REGISTERED21 :58 :19.510 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] ACTIVE21 :58 :19.524 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] READ: 9B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 7a 68 61 6e 67 73 68 61 6e |zhangshan | +--------+-------------------------------------------------+----------------+ 21 :58 :19.524 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] READ COMPLETE21 :58 :19.524 [defaultEventLoopGroup-2 -2 ] DEBUG netty.c2.TestEventLoopGroupIOServer - zhangshan 21 :58 :21.531 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] READ: 9B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 7a 68 61 6e 67 73 68 61 6e |zhangshan | +--------+-------------------------------------------------+----------------+ 21 :58 :21.532 [nioEventLoopGroup-3 -1 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x11b1e70c , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10096 ] READ COMPLETE21 :58 :21.532 [defaultEventLoopGroup-2 -2 ] DEBUG netty.c2.TestEventLoopGroupIOServer - zhangshan 21 :58 :36.086 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] REGISTERED21 :58 :36.086 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] ACTIVE21 :58 :36.100 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 21 :58 :36.100 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] READ COMPLETE21 :58 :36.100 [defaultEventLoopGroup-2 -1 ] DEBUG netty.c2.TestEventLoopGroupIOServer - lisi 21 :58 :38.102 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] READ: 4B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 6c 69 73 69 |lisi | +--------+-------------------------------------------------+----------------+ 21 :58 :38.103 [nioEventLoopGroup-3 -2 ] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xdcec43d1 , L:/127.0 .0 .1 :8080 - R:/127.0 .0 .1 :10114 ] READ COMPLETE21 :58 :38.103 [defaultEventLoopGroup-2 -1 ] DEBUG netty.c2.TestEventLoopGroupIOServer - lisi
nio 工人和非nio功能都绑定了channel来处理数据,从上面的日志可以看到,在使用pipeline中的handler处理msg的过程中线程从nioEventLoopGroup切换到了
defaultEventLoopGroup,这里在执行read事件的handler时会去拿处理该事件的eventLoop,然后判断该evenLoop的绑定的线程是否是当前线程,如果是的话就直接执行,不是的话将handler要执行的代码封装成一个任务交给处理该事件的eventLoop处理。
关键代码:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable () { @Override public void run () { next.invokeChannelRead(m); } }); } }
NioEventLoop处理普通任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public class TestEventLoopExecuteNormalTask { public static void main (String[] args) throws Exception { NioEventLoopGroup nioworkers = new NioEventLoopGroup (2 ); log.debug("Server start..." ); Thread.sleep(2000 ); nioworkers.execute(()->{ log.debug("normal task..." ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } }); log.debug("main thread ended..." ); } }
输出:
1 2 3 22 :48 :07.609 [main] DEBUG netty.c2.TestEventLoopExecuteNormalTask - Server start...22 :48 :09.617 [main] DEBUG netty.c2.TestEventLoopExecuteNormalTask - main thread ended...22 :48 :09.618 [nioEventLoopGroup-2 -1 ] DEBUG netty.c2.TestEventLoopExecuteNormalTask - normal task...
NioEventLoop处理定时任务 1 2 3 4 5 6 7 8 9 10 @Slf4j public class TestEventLoopExecuteCronTask { public static void main (String[] args) { NioEventLoopGroup eventExecutors = new NioEventLoopGroup (2 ); log.debug("server start...." ); eventExecutors.scheduleAtFixedRate(()->{ log.debug("running" ); }, 0 , 1 , TimeUnit.SECONDS); } }
输出:
1 2 3 4 5 6 22 :35 :15 [DEBUG] [main] c.i.o.EventLoopTest2 - server start...22 :35 :17 [DEBUG] [nioEventLoopGroup-2 -1 ] c.i.o.EventLoopTest2 - running...22 :35 :18 [DEBUG] [nioEventLoopGroup-2 -1 ] c.i.o.EventLoopTest2 - running...22 :35 :19 [DEBUG] [nioEventLoopGroup-2 -1 ] c.i.o.EventLoopTest2 - running...22 :35 :20 [DEBUG] [nioEventLoopGroup-2 -1 ] c.i.o.EventLoopTest2 - running......
2 Channel channel的主要作用
close() 可以用来关闭 channel
closeFuture() 用来处理 channel 的关闭
sync 方法作用是同步等待 channel 关闭
而 addListener 方法是异步等待 channel 关闭
pipeline() 方法添加处理器
write() 方法将数据写入
writeAndFlush() 方法将数据写入并刷出
ChannelFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) { ch.pipeline().addLast(new StringEncoder ()); } }) .connect("127.0.0.1" , 8080 ) .sync() .channel() .writeAndFlush(new Date () + ": hello world!" );
将之前写的客户端代码进行拆分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class TestNettyClientSplited { public static void main (String[] args) throws Exception { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup (1 )) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" , 8080 ); channelFuture.sync().channel().writeAndFlush("helloworld" ); } }
connect()建立连接的过程是异步的,返回channelFuture对象,channellFuture对象的sync方法可以等待连接建立完成。再通过channel()方法拿到正确的channel对象,最后通过channel对象的writeAndFlush方法发送数据。
下面可以打印sync前后channelFuture对象拿到的channel,看看不同:
1 2 23 :23 :00.894 [main] DEBUG netty.c1.TestNettyClientSplited - before [id: 0x55501c89 ]23 :23 :00.907 [main] DEBUG netty.c1.TestNettyClientSplited - after [id: 0x55501c89 , L:/127.0 .0 .1 :4290 - R:localhost/127.0 .0 .1 :8080 ]
**注意:**connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
处了通过sync方法等待连接建立完成之外,也可以使用回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public class TestNettyClientListener { public static void main (String[] args) throws Exception { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup (1 )) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" , 8080 ); channelFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { System.out.println(future.channel()); } }); } }
输出:
1 [id: 0x655a38f2, L:/127.0.0.1:4974 - R:localhost/127.0.0.1:8080]
CloseFuture 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf4j public class TestCloseChannelFuture { public static void main (String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup (1 ); ChannelFuture channelFuture = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <Channel>() { @Override protected void initChannel (Channel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); Channel channel = channelFuture.sync().channel(); new Thread (()->{ Scanner scanner = new Scanner (System.in); while (true ) { String s = scanner.nextLine(); if ("q" .equals(s)) { channel.close(); break ; } channel.writeAndFlush(s); } }, "intput" ).start(); channel.closeFuture().addListener((ChannelFutureListener) future -> { log.debug("处理channel已关闭,释放group资源" ); group.shutdownGracefully(); }); } }
异步提升的点 使用多线程进行异步操作并不一定能提高任务的处理效率,多线程反而会使响应时间变长,只有合理的划分任务,各个线程配合工作才能提高任务的处理效率。
假设有四个医生每个医生工作8小时,20分钟看一个病人,这样4个医生每天最多看 8 * 4 * 3 = 96 个病人。
但是如果将看病这个任务划分成挂号、看病、缴费、取药这些子任务,每个任务5分钟,分配给各个医生单独处理就会达到不一样的效果。
一开始医生2、3、4分别要等5、10、15分钟才能执行工作
但当病人数量增加后四个医生就能满负荷工作,一天可以看 4 * 8 * 4 * 3 = 384 效率大概是之前的4倍
总结:
单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
异步并没有缩短响应时间,反而有所增加
合理进行任务拆分,也是利用异步的关键
3 Future & Promise netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称
jdk Future
netty Future
Promise
cancel
取消任务
-
-
isCanceled
任务是否取消
-
-
isDone
任务是否完成,不能区分成功失败
-
-
get
获取任务结果,阻塞等待
-
-
getNow
-
获取任务结果,非阻塞,还未产生结果时返回 null
-
await
-
等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
-
sync
-
等待任务结束,如果任务失败,抛出异常
-
isSuccess
-
判断任务是否成功
-
cause
-
获取失败信息,非阻塞,如果没有失败,返回null
-
addLinstener
-
添加回调,异步接收结果
-
setSuccess
-
-
设置成功结果
setFailure
-
-
设置失败结果
同步处理任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Slf4j public class TestSyncExecuteTask { public static void main (String[] args) throws Exception { DefaultEventLoop eventExecutor = new DefaultEventLoop (); DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutor); eventExecutor.execute(()->{ try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } log.debug("set success, {}" , 10 ); promise.setSuccess(10 ); }); log.debug("start...." ); log.debug("promise getNow {}" , promise.getNow()); log.debug("{}" , promise.get()); } }
输出:
1 2 3 4 11 :38 :09.620 [main] DEBUG netty.c3.TestSyncExecuteTask - start....11 :38 :09.620 [main] DEBUG netty.c3.TestSyncExecuteTask - promise getNow null 11 :38 :11.626 [defaultEventLoop-1 -1 ] DEBUG netty.c3.TestSyncExecuteTask - set success, 10 11 :38 :11.626 [main] DEBUG netty.c3.TestSyncExecuteTask - 10
异步处理任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j public class TestAsyncExecuteTask { public static void main (String[] args) throws Exception { DefaultEventLoop eventExecutors = new DefaultEventLoop (); DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); promise.addListener(future -> { log.debug("feature getNow {}" , future.getNow()); }); eventExecutors.execute(()->{ try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } log.debug("set success {}" , 10 ); promise.setSuccess(10 ); }); } }
输出:
1 2 11:45:24.818 [defaultEventLoop-1-1] DEBUG netty.c3.TestAsyncExecuteTask - set success 10 11:45:24.819 [defaultEventLoop-1-1] DEBUG netty.c3.TestAsyncExecuteTask - feature getNow 10
同步处理任务失败 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf4j public class TestSyncTaskFail { public static void main (String[] args) { DefaultEventLoop eventExecutors = new DefaultEventLoop (); DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } RuntimeException runtimeException = new RuntimeException ("error..." ); promise.setFailure(runtimeException); }); try { promise.await(); } catch (Exception e) { e.printStackTrace(); } if (!promise.isSuccess()) { log.debug("error: " ,promise.cause()); } } }
异步处理任务失败 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class TestAsyncTaskFAil { public static void main (String[] args) { DefaultEventLoop eventExecutors = new DefaultEventLoop (); DefaultPromise<Integer> promise = new DefaultPromise <>(eventExecutors); promise.addListener(future -> { log.debug("result: {}" , future.isSuccess() ? future.getNow() : future.cause()); }); eventExecutors.execute(()->{ try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException (e); } RuntimeException runtimeException = new RuntimeException ("error..." ); promise.setFailure(runtimeException); }); } }
await 死锁检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Slf4j public class TestAwaitDeadLock { public static void main(String[] args) { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ log.debug("1"); try { promise.await();// 再event loop 线程当中调用阻塞操作会造成死锁 抛出BlockingOperationException } catch (Exception e) { log.debug("", e); } log.debug("2"); }); // eventExecutors.execute(()->{ // try { // promise.await(); // } catch (Exception e) { // e.printStackTrace(); // } // }); } }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13:10:54.357 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock - 1 13:10:54.358 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock - io.netty.util.concurrent.BlockingOperationException: DefaultPromise@6f58ba01(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212) at netty.c3.TestAwaitDeadLock.lambda$main$0(TestAwaitDeadLock.java:16) at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:750) 13:10:54.359 [defaultEventLoop-1-1] DEBUG netty.c3.TestAwaitDeadLock - 2
4 handler & pipeline channelHandler用来处理channel上的各种事件,分为入站和出站两种。pipeline 由一连串的handler组成。
入站处理器用于读取客户端数据,写回结果。通常是 ChannelInboundHandlerAdapter 的子类。
出站处理器用于对写回结果的加工。通常是 ChannelOutboundHandlerAdapter 的子类。
类比:
channel是一个产品的加工车间,pipeline是该车间的流水线,handler就是 流水线上的一道道工序,bytebuf就是原材料,先经过一道道入站工序,再经过一道道出站工序最后变成产品。
handler的处理顺序:
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class TestPipelineServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(1 ); ctx.fireChannelRead(msg); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(2 ); ctx.fireChannelRead(msg); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(3 ); ctx.channel().write(msg); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter () { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println(4 ); ctx.write(msg, promise); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter () { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println(5 ); ctx.write(msg, promise); } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter () { @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println(6 ); ctx.write(msg, promise); } }); } }) .bind(new InetSocketAddress ("localhost" , 8080 )); } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class TestPipelineClient { public static void main (String[] args) throws Exception { ChannelFuture channelFuture = new Bootstrap () .group(new NioEventLoopGroup (1 )) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }) .connect(new InetSocketAddress ("localhost" , 8080 )); Channel channel = channelFuture.sync().channel(); channel.writeAndFlush("hello world!" ); } }
服务端输出:
可以看到入站处理器是按照addLast的顺序执行的,出站处理器是按照addLast的逆序执行的。ChannelPipline的实现是一个ChannelHandlerContext组成的双向链表
入站处理器通过调用ctx.fireChannelRead来调用下一个入站处理器。
注释掉 1 处的代码 就仅仅会打印 1
注释掉2处的代码 仅仅打印 1、2
3处的ctx.channel.write(msg)方法会从pipeline尾部寻找出战处理器执行
调用ctx.write方法会从当前处理器向上寻找前一个出站处理器
ctx.channel().write(msg) vs ctx.write(msg)
都是触发出站处理器的执行
ctx.channel().write(msg) 从尾部开始查找出站处理器
ctx.write(msg) 是从当前节点 找上一个出站处理器
3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己
5 ByteBuf ByteBuf是对字节数据的封装
1 创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class TestByteBuf { public static void main (String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10 ); log(buffer); } private static void log (ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1 ) + 4 ; StringBuilder buf = new StringBuilder (rows * 80 * 2 ) .append("read index:" ).append(buffer.readerIndex()) .append(" write index:" ).append(buffer.writerIndex()) .append(" capacity:" ).append(buffer.capacity()) .append(StringUtil.NEWLINE); ByteBufUtil.appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); } }
2 直接内存 vs 堆内存 创建基于堆的ByteBuf
1 ByteBuf heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer();
创建基于直接内存的ByteBuf
1 ByteBuf directBuffer = ByteBufAllocator.DEFAULT.directBuffer();
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
3 池化、非池化
直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
4 ByteBuf的 组成
一开始读写指针都在0位置
5 写入 方法列表,省略一些不重要的方法
方法签名
含义
备注
writeBoolean(boolean value)
写入 boolean 值
用一字节 01|00 代表 true|false
writeByte(int value)
写入 byte 值
writeShort(int value)
写入 short 值
writeInt(int value)
写入 int 值
Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value)
写入 int 值
Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value)
写入 long 值
writeChar(int value)
写入 char 值
writeFloat(float value)
写入 float 值
writeDouble(double value)
写入 double 值
writeBytes(ByteBuf src)
写入 netty 的 ByteBuf
writeBytes(byte[] src)
写入 byte[]
writeBytes(ByteBuffer src)
写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)
写入字符串
注意
先写入 4 个字节
1 2 buffer.writeBytes(new byte []{1 , 2 , 3 , 4 }); log(buffer);
结果是
1 2 3 4 5 6 read index:0 write index:4 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 |.... | +--------+-------------------------------------------------+----------------+
再写入一个 int 整数,也是 4 个字节
1 2 buffer.writeInt(5 ); log(buffer);
结果是
1 2 3 4 5 6 read index:0 write index:8 capacity:10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 00 00 00 05 |........ | +--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
6 扩容 初始容量不够时就会对ByteBuf扩容,再向ByteBuf中写入一个int
1 2 buffer.writeInt(6 ); log(buffer);
如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)
扩容不能超过 max capacity 会报错
1 2 3 4 5 6 read index:0 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 01 02 03 04 00 00 00 05 00 00 00 06 |............ | +--------+-------------------------------------------------+----------------+
7 读取 例如读了 4 次,每次一个字节
1 2 3 4 5 System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); log(buffer);
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
1 2 3 4 5 6 7 8 9 10 1 2 3 4 read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 06 |........ | +--------+-------------------------------------------------+----------------+
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
1 2 3 buffer.markReaderIndex(); System.out.println(buffer.readInt()); log(buffer);
结果
1 2 3 4 5 6 7 5 read index:8 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 06 |.... | +--------+-------------------------------------------------+----------------+
这时要重复读取的话,重置到标记位置 reset
1 2 buffer.resetReaderIndex(); log(buffer);
这时
1 2 3 4 5 6 read index:4 write index:12 capacity:16 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 00 00 00 05 00 00 00 06 |........ | +--------+-------------------------------------------------+----------------+
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index
8 retain & release 由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
回收内存的源码实现,请关注下面方法的不同实现
protected abstract void deallocate()
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
每个 ByteBuf 对象的初始计数为 1
调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
不是我们想象的(一般情况下)
1 2 3 4 5 6 ByteBuf buf = ...try { ... } finally { buf.release(); }
请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release ,详细分析如下
起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
入站 ByteBuf 处理原则
对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
出站 ByteBuf 处理原则
出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
异常处理原则
有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
TailContext 释放未处理消息逻辑
1 2 3 4 5 6 7 8 9 10 protected void onUnhandledInboundMessage (Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration." , msg); } finally { ReferenceCountUtil.release(msg); } }
具体代码
1 2 3 4 5 6 7 public static boolean release (Object msg) { if (msg instanceof ReferenceCounted) { return ((ReferenceCounted) msg).release(); } return false ; }
9 slice 【零拷贝】对原始的ByteBuf进行切片,但是并没有发生复制,还是共用一块内存,切片后的ByteBuf维护独立的read、write指针
例,原始 ByteBuf 进行一些初始操作
1 2 3 4 ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10 );origin.writeBytes(new byte []{1 , 2 , 3 , 4 }); origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
1 2 3 ByteBuf slice = origin.slice();System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
如果原始 ByteBuf 再次读操作(又读了一个字节)
1 2 origin.readByte(); System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 03 04 |.. | +--------+-------------------------------------------------+----------------+
这时的 slice 不受影响,因为它有独立的读写指针
1 System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 04 |... | +--------+-------------------------------------------------+----------------+
如果 slice 的内容发生了更改
1 2 slice.setByte(2 , 5 ); System.out.println(ByteBufUtil.prettyHexDump(slice));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 02 03 05 |... | +--------+-------------------------------------------------+----------------+
这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存
1 System.out.println(ByteBufUtil.prettyHexDump(origin));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 03 05 |.. | +--------+-------------------------------------------------+----------------+
10 duplicate 【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
11 copy 会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
12 CompositeByteBuf 【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
有两个 ByteBuf 如下
1 2 3 4 5 6 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 );buf1.writeBytes(new byte []{1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 );buf2.writeBytes(new byte []{6 , 7 , 8 , 9 , 10 }); System.out.println(ByteBufUtil.prettyHexDump(buf1)); System.out.println(ByteBufUtil.prettyHexDump(buf2));
输出
1 2 3 4 5 6 7 8 9 10 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 |..... | +--------+-------------------------------------------------+----------------+ +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 06 07 08 09 0a |..... | +--------+-------------------------------------------------+----------------+
现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?
方法1:
1 2 3 4 5 ByteBuf buf3 = ByteBufAllocator.DEFAULT .buffer(buf1.readableBytes()+buf2.readableBytes()); buf3.writeBytes(buf1); buf3.writeBytes(buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3));
结果
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
这种方法好不好?回答是不太好,因为进行了数据的内存复制操作
方法2:
1 2 3 CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();buf3.addComponents(true , buf1, buf2);
结果是一样的
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
缺点,复杂了很多,多次操作会带来性能的损耗
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
1 2 3 4 5 6 7 8 ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5 );buf1.writeBytes(new byte []{1 , 2 , 3 , 4 , 5 }); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5 );buf2.writeBytes(new byte []{6 , 7 , 8 , 9 , 10 }); ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);System.out.println(ByteBufUtil.prettyHexDump(buf3));
输出
1 2 3 4 5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 07 08 09 0a |.......... | +--------+-------------------------------------------------+----------------+
也可以用来包装普通字节数组,底层也不会有拷贝操作
1 2 3 ByteBuf buf4 = Unpooled.wrappedBuffer(new byte []{1 , 2 , 3 }, new byte []{4 , 5 , 6 });System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));
输出
1 2 3 4 5 6 class io.netty.buffer.CompositeByteBuf +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 06 |...... | +--------+-------------------------------------------------+----------------+
ByteBuf 优势
池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
读写指针分离,不需要像 ByteBuffer 一样切换读写模式
可以自动扩容
支持链式调用,使用更流畅
很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf
EchoServer server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class EchoServer { public static void main (String[] args) { new ServerBootstrap () .group(new NioEventLoopGroup (2 )) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buffer = msg instanceof ByteBuf ? (ByteBuf) msg : null ; if (buffer != null ) { System.out.println(Charset.defaultCharset().decode(buffer.nioBuffer()).toString()); ByteBuf response = ByteBufAllocator.DEFAULT.buffer(); response.writeBytes(buffer); ctx.writeAndFlush(buffer); buffer.release(); } } }); } }).channel(NioServerSocketChannel.class) .bind(8080 ); } }
client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class EchoClient { public static void main (String[] args) throws Exception { NioEventLoopGroup group = new NioEventLoopGroup (); Channel channel = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter () { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ByteBuf buffer = (ByteBuf) msg; System.out.println(buffer.toString(Charset.defaultCharset())); buffer.release(); } }); } }).connect("127.0.0.1" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); new Thread (() -> { Scanner scanner = new Scanner (System.in); while (true ) { String line = scanner.nextLine(); if ("q" .equals(line)) { channel.close(); break ; } channel.writeAndFlush(line); } }).start(); } }