NIO基础
三大组件
Channel & Buffer
channel 是读写数据的双向通道,可以从channel将数据读入到buffer,也可以将buffer的数据写入到channel。
常见Channel
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
buffer是数据的缓存区,常见的buffer有
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
Selector
一开始服务器每开启一个socket连接,就分配一个线程进行处理。这种方式内存开销大,线程的上下文切换成本高,只适合连接数少的情况。
然后引入线程池来管理线程,使用线程池当中的线程来处理socket,这样可以实现线程的复用,但是一个线程还是只能处理一个socket连接,仅仅适合短连接的常见。
最后nio使用selector来实现多路复用,一个selector可以管理多个channel,一个selector绑定一个线程。selector的select()方法会将绑定的线程阻塞,防止该线程空跑占用cpu资源,当selector管理的channel发生读写事件时,selector就会将这些事件交给threadchu’l
ByteBuffer
使用FileChannel读取data.txt文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j public class TestFileChannel { public static void main(String[] args) { try (RandomAccessFile file = new RandomAccessFile(TestFileChannel.class.getClassLoader().getResource("data.txt").getFile(), "rw") ) { FileChannel fileChannel = file.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(5); do { int len = fileChannel.read(buffer); log.info("len: {}", len); if (len == -1) break; buffer.flip(); while (buffer.hasRemaining()) { log.info("{}", (char) buffer.get()); } buffer.clear(); } while (true); } catch (Exception e) { e.printStackTrace(); } } }
|
ByteBuffer的正确使用方法
- 通过channel读取数据存入ByteBuffer,channel.read(buf)
- 通过flip()将buffer切换为读模式
- 通过buffer.get() 读取数据
- 通过clear(), compact()将buffer切换到写模式
ByteBuffer结构
ByteBuffer常用方法
分配空间allocate
1
| Bytebuffer buf = ByteBuffer.allocate(16);
|
向ByteBuffer写数据
- 使用channel.read(buffer)
- buffer.put
从ByteBuffer读取数据
- channel.write(buf)
1
| int writeBytes = channel.write(buf);
|
- buffer.get
get()方法会使position指针移动,使用rewind方法重置指针,或者使用get(index)方法不会移动指针
可以调用 rewind 方法将 position 重新置为 0
或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针
mark & reset
通过调用mark方法可以记录当前指针的位置,再次调用reset方法可以恢复position到之前标记的位置
filip 与 rewind 都会清除mark标记
字符长与ByteBuffer互相转换
字符串2buffer
1 2
| ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好"); ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");
|
buffer2字符长
1 2 3
| CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1); System.out.println(buffer3.getClass()); System.out.println(buffer3.toString());
|
线程安全性
⚠bytebuffer是非线程安全的。
scattering read
分散读可以直接将数据读入多个bytebuffer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Slf4j public class TestScatteringReads { public static void main(String[] args) { String file = TestScatteringReads.class.getClassLoader().getResource("words.txt").getFile(); try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) { FileChannel channel = accessFile.getChannel(); ByteBuffer buf1 = ByteBuffer.allocate(3); ByteBuffer buf2 = ByteBuffer.allocate(3); ByteBuffer buf3 = ByteBuffer.allocate(6); long read = channel.read(new ByteBuffer[]{buf1, buf2, buf3}); log.info("read bytes {}", read);
buf1.flip(); buf2.flip(); buf3.flip();
ByteBufferUtil.debugRead(buf1); ByteBufferUtil.debugRead(buf2); ByteBufferUtil.debugRead(buf3);
} catch (IOException e) { e.printStackTrace(); } } }
|
gathring writes
集中写,将多个bytebuffer的数据写入文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class TestGatheringWrites { public static void main(String[] args) { String file = TestScatteringReads.class.getClassLoader().getResource("3parts.txt").getFile(); System.out.println(file); try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) { FileChannel channel = accessFile.getChannel(); ByteBuffer buf1 = ByteBuffer.allocate(4); ByteBuffer buf2 = ByteBuffer.allocate(4);
buf1.put(buf2); buf1.put(new byte[] {'a', 'b', 'c', 'd'}); buf2.put(new byte[] {'e', 'f', 'g', 'h'});
buf1.flip();
ByteBufferUtil.debugRead(buf1);
channel.write(new ByteBuffer[] {buf1, buf2});
} catch (IOException e) { e.printStackTrace(); } } }
|
处理黏包和半包
通过TCP发送多条数据给服务端,假设发送下面三条数据
- Hello,world\n
- I’m zhangsan\n
- How are you?\n
但是在发送过程中由于tcp协议、bytebuffer大小等原因会发生黏包和半包现象,上面的三条数据被组合成了下面两条数据
- Hello,world\nI’m zhangsan\nHo
- w are you?\n
利用分隔符的方式,可以将发送的数据重新解码成正常的三条数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class TestStikeyHalfPkg { public static void main(String[] args) { ByteBuffer byteBuffer = ByteBuffer.allocate(32); byteBuffer.put("Hello,world\nI'm zhangsan\nHo".getBytes());
split(byteBuffer); byteBuffer.put("w are you?\nhaha!\n".getBytes()); split(byteBuffer); }
public static void split(ByteBuffer buffer) { buffer.flip(); ByteBufferUtil.debugAll(buffer); int oldLimit = buffer.limit(); for (int i = 0; i < oldLimit; i++) { byte b = buffer.get(i); if (b == '\n') { System.out.println(buffer.position()); ByteBuffer target = ByteBuffer.allocate(i - buffer.position() + 1); buffer.limit(i + 1); target.put(buffer); ByteBufferUtil.debugAll(target); buffer.limit(oldLimit); } } buffer.compact(); } }
|
网络编程
阻塞与非阻塞
阻塞
阻塞
在阻塞模式下,所有的方法会阻塞当前线程继续运行,当前线程将会暂停。
- ServerSocketChannel.accept() 方法会阻塞线程运行,只有新的连接建立后才会让线程继续运行
- SocketChannel.read() 方法当读不到channel数据时也会阻塞
阻塞的特点就是暂停当前线程,该线程不会占用cpu,该线程也不能去做其他事情
单线程
在单线程下阻塞方法之间相互影响,几乎不能正常运行,因此需要使用多线程
多线程
不同位数的jvm 每个线程占用的内存也不同, 32位的jvm 一个线程 320k, 64位的jvm一个线程 1024k,如果每个连接用一个线程处理 ,连接过多就会导致oom,而且线程过多,会因为频繁的上下文切换导致性能降低。
引入线程池后虽然能减少线程的上下文切换,但是如果建立过多的长连接也会导致线程池当中的线程阻塞,因此只适合处理短链接
client
1 2 3 4 5 6 7
| public class Client { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); boolean connected = socketChannel.connect(new InetSocketAddress("localhost", 8080)); System.out.println("waiting..."); } }
|
server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Slf4j public class Server { public static void main(String[] args) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(16); try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(8080)); boolean blocking = ssc.isBlocking(); System.out.println(blocking); List<SocketChannel> channels = new ArrayList<>(); while (true) { log.info("connecting......."); SocketChannel channel = ssc.accept(); log.info("connected"); channels.add(channel);
for (SocketChannel socketChannel : channels) { log.info("before read ...{}", socketChannel); socketChannel.read(buffer); buffer.flip(); ByteBufferUtil.debugAll(buffer); buffer.clear(); log.debug("after read ... {}", socketChannel); } } }
} }
|
非阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Slf4j public class ServerNonBlocking { public static void main(String[] args) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(16); try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); boolean blocking = ssc.isBlocking(); System.out.println(blocking); List<SocketChannel> channels = new ArrayList<>(); while (true) {
SocketChannel channel = ssc.accept(); if (channel != null) { log.info("connected"); channel.configureBlocking(false); channels.add(channel); }
for (SocketChannel socketChannel : channels) {
int read = socketChannel.read(buffer); if(read > 0) { buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); log.debug("after read ... {}", socketChannel); } } } } } }
|
多路复用
单线程通过Selector可以防止非阻塞模式下线程一直运行,Selector可以监控多个Channel的读写事件
- 多路复用只能用于网络IO,文件IO无法多路复用
- 通过Selector可以实现事件触发时才唤醒线程进行处理
- 有连接事件时去处理连接事件
- 有读事件时去处理读事件
- 有写事件时去处理写事件
优点
- 事件触发,防止线程做无用功,白白占用cpu
- 让一个线程充分的被利用
- 节约线程的数量
- 可以减少线程的上下文切换
Selector的创建
1
| Selector selector = Selector.open()
|
Channel绑定Selector
1 2
| socketchannel.configureBlocking(false); socketchannel.register(selector, SelectionKeys.OPS_ACCEPT)
|
- channel需使用非阻塞模式
- fileChannel无非阻塞模式,所以不能和Selector一起使用
- 可绑定的事件类型
- connect - 客户端连接成功时触发
- accept - 服务器端成功接受连接时触发
- read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
Channel事件监听
1.阻塞直到事件发送
1
| int count = selector.select()
|
2.阻塞到事件发送或者超时
1
| int count = selector.select(long timeout)
|
3.不阻塞
1
| int count = selector.selectNow()
|
Selector 不阻塞继续运行的情况
- 事件发生
- 调用selector.wakeup()
- 调用 selector.close()
- selector 所在线程 interrupt
处理accept 和 read 事件
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Slf4j public class ServerSelector { public static void main(String[] args) throws Exception {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) { int count = selector.select(); log.debug("select count: {}", count);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); log.debug("{}",sc); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { try { ByteBuffer buffer = ByteBuffer.allocate(128); SocketChannel c = (SocketChannel) key.channel(); int read = c.read(buffer); System.out.println(read); if (read == -1) { log.debug("end of the stream key cancel"); key.cancel(); c.close(); } else { buffer.flip(); ByteBufferUtil.debugRead(buffer); buffer.clear(); } } catch (IOException e) { key.cancel(); } } iterator.remove(); } }
} catch (Exception e) { e.printStackTrace(); } } }
|
注意
1.事件处理完成之后必须将该事件从 selectkeys set中移除,否则下次循环还会进入该事件的分支当中
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如
- 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
- 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,仍然会进入key.isaccept的处理逻辑,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
2.事件发生后必须处理,如果不处理该事件会不断触发
3.调用Key.cancel后会取消注册在selector上的channel 并且 将key 从keyset中移除
处理边界问题
由于ByteBuffer的大小是固定的,而消息的长度可长可短,因此发送的消息在被bytebuffer接收时很有可能是不完整的,或者多条消息粘连在一起。
因此需要对边界问题进行处理,有以下几种方式解决:
- 客户端和服务器按照约定,按照固定长度收发数据,但是这样会浪费带宽
- 另外一种方式就是约定消息的分隔符,按照分隔符进行分割,缺点是效率比较低
- TLV格式,Type、Length、Value,一条消息包含类型和长度信息,类型和长度信息的大小可以先约定好,这样就可以很方便获取消息的大小,分配合适的buffer。缺点是buffer需要提前分配,影响server的吞吐量
- HTTP 1.1 是TLV格式
- HTTP 2.0 是LTV格式
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Slf4j public class ServerSelector { public static void main(String[] args) throws Exception {
try (ServerSocketChannel ssc = ServerSocketChannel.open()) { ssc.bind(new InetSocketAddress(8080)); ssc.configureBlocking(false); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) { int count = selector.select(); log.debug("select count: {}", count);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); log.debug("{}",sc); sc.configureBlocking(false); ByteBuffer buf = ByteBuffer.allocate(16); sc.register(selector, SelectionKey.OP_READ, buf); } else if (key.isReadable()) { try { SocketChannel c = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = c.read(buffer); if (read == -1) { log.debug("end of the stream key cancel"); key.cancel(); c.close(); } else { split(buffer); if (buffer.position() == buffer.limit()) { ByteBuffer newByteBuffer = ByteBuffer.allocate(buffer.capacity() * 2); buffer.flip(); newByteBuffer.put(buffer); key.attach(newByteBuffer); } } } catch (IOException e) { key.cancel(); } } iterator.remove(); } }
} catch (Exception e) { e.printStackTrace(); } }
public static void split(ByteBuffer buffer) { buffer.flip(); ByteBufferUtil.debugRead(buffer); int oldLimit = buffer.limit(); for (int i = 0; i < oldLimit; i++) { byte b = buffer.get(i); if (b == '\n') { System.out.println(buffer.position()); ByteBuffer target = ByteBuffer.allocate(i - buffer.position() + 1); buffer.limit(i + 1); target.put(buffer); ByteBufferUtil.debugAll(target); buffer.limit(oldLimit); } } buffer.compact(); } }
|
Client
1
| sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
|
初始分配了16个字节的bytebuffer,发送的消息超过这个大小是就会将buffer扩容,上面的代码示例中采用了 分隔符处理消息的形式
- 注意点 bytebuffer的分配
- 每个channel应该单独分配一个bytebuffer,bytebuffer 不能共用,因为每个channel的消息都有可能发生截断
- bytebuffer不能太大,假如一个bytebuffer是1m 如果需要支持百万连接,内存就需要1Tb的内存
- 可以先分配一个初始大小的buffer例如4k, 不够用的时候在分配一个新的buffer,cap翻一倍,优点是消息连续容易处理,缺点是会拷贝消耗性能
- 也可以通过bytebuffer数组来实现,一个数组不够用就把多出来的内容写入新的buffer,但是这样消息不连续,解析很复杂,优点是避免了拷贝,降低了性能损耗
处理write事件
底层的socket缓冲区是有一定的大小的,buffer中的数据可能不能一次写入到channel中,而只要向channel中写数据就会触发write事件,因此只有当buffer中数据还有剩余是才需要监听write事件,当数据写完时再取消write事件的监听
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Slf4j public class WriteServer { public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.bind(new InetSocketAddress(8080)); ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) { selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) { SelectionKey key = iterator.next();
if (key.isAcceptable()) { ServerSocketChannel c = (ServerSocketChannel) key.channel(); SocketChannel sc = c.accept(); sc.configureBlocking(false); ByteBuffer buffer = Charset.defaultCharset().encode("1234"); int write = sc.write(buffer); log.debug("actual write bytes num {}", write); if (buffer.hasRemaining()) { sc.register(selector, SelectionKey.OP_WRITE, buffer); } } else if (key.isWritable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); int write = sc.write(buffer); log.debug("actual write bytes num {}", write); if (!buffer.hasRemaining()) { key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); key.attach(null); } } iterator.remove(); }
}
} }
|
Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Slf4j public class WriteServerClient { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Selector selector = Selector.open();
socketChannel.register(selector, (SelectionKey.OP_CONNECT | SelectionKey.OP_READ));
socketChannel.connect(new InetSocketAddress("localhost", 8080));
int count = 0; while (true) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isConnectable()) { log.debug("connected"); System.out.println(socketChannel.finishConnect()); } else if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += socketChannel.read(buffer); buffer.clear(); System.out.println(count); } }
}
} }
|
多线程优化
通过多线程可以对代码进一步优化,使用一个线程作为boss线程专门处理accept事件,使用与cpu核心数相同的worker线程轮流处理read事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Slf4j public class ServerSelectorWithMutiThread { public static void main(String[] args) throws Exception { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); WorkerEventLoop[] workers = new WorkerEventLoop[2]; for (int i = 0; i < workers.length; i++) { workers[i] = new WorkerEventLoop(i); } AtomicInteger cnt = new AtomicInteger(); while (true) { boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected {}", sc); workers[cnt.getAndIncrement() % workers.length].register(sc); } } } }
public static class WorkerEventLoop implements Runnable { private Selector worker; private volatile boolean start = false; private int index;
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public WorkerEventLoop(int index) { this.index = index; }
public void register(SocketChannel sc) throws IOException { if (!start) { worker = Selector.open(); new Thread(this, "worker-" + index).start(); start =true; } queue.add(()-> { try { log.debug("sc.register select"); sc.register(worker, SelectionKey.OP_READ, null); } catch (IOException e) { throw new RuntimeException(e); } }); worker.wakeup(); }
@Override public void run() { while (true) { try { log.debug("worker.select()"); worker.select(); Runnable task = queue.poll(); if (task != null) { task.run(); } Iterator<SelectionKey> iterator = worker.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.info("channel {}", channel.getRemoteAddress()); int read = channel.read(buffer); buffer.flip(); ByteBufferUtil.debugAll(buffer); } } } catch (IOException e) { throw new RuntimeException(e); } } } } }
|
对上面代码的boss 进行封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| package nio.c2;
import lombok.extern.slf4j.Slf4j; import util.ByteBufferUtil;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger;
@Slf4j public class ServerSelectorWithMutiThread { public static void main(String[] args) throws Exception { BossEventLoop bossEventLoop = new BossEventLoop(); bossEventLoop.register(); }
public static class BossEventLoop implements Runnable { private Selector boss; private WorkerEventLoop[] workers; private volatile boolean start = false; private AtomicInteger cnt;
public void register() throws IOException { if (!start) { cnt = new AtomicInteger(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8080)); workers = new WorkerEventLoop[2]; for (int i = 0; i < workers.length; i++) { workers[i] = new WorkerEventLoop(i); } Thread bossThread = new Thread(this, "boss");
bossThread.start(); start = true; } }
@Override public void run() { while (true) { try { log.debug(" is daemon {}", Thread.currentThread().isDaemon()); boss.select(); Iterator<SelectionKey> iterator = boss.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected {}", sc); workers[cnt.getAndIncrement() % workers.length].register(sc); } } } catch (IOException e) { throw new RuntimeException(e); } } } }
public static class WorkerEventLoop implements Runnable { private Selector worker; private volatile boolean start = false; private int index;
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); public WorkerEventLoop(int index) { this.index = index; }
public void register(SocketChannel sc) throws IOException { if (!start) { worker = Selector.open(); new Thread(this, "worker-" + index).start(); start =true; } queue.add(()-> { try { log.debug("sc.register select"); sc.register(worker, SelectionKey.OP_READ, null); } catch (IOException e) { throw new RuntimeException(e); } }); worker.wakeup(); }
@Override public void run() { while (true) { try { log.debug("worker.select()"); worker.select(); Runnable task = queue.poll(); if (task != null) { task.run(); } Iterator<SelectionKey> iterator = worker.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.info("channel {}", channel.getRemoteAddress()); int read = channel.read(buffer); buffer.flip(); ByteBufferUtil.debugAll(buffer); } } } catch (IOException e) { throw new RuntimeException(e); } } } } }
|
获取cpu的核数
- Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
- 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启
UDP
- UDP是无连接的协议,client发送数据不需要管Server是否开启
- Server端的receive方法会将接收到的数据存入bytebuffer,但是如果数据报文超过bytebuffer的大小就会将多出来的数据抛弃
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Slf4j public class UDPServer { public static void main(String[] args) throws IOException { DatagramChannel channel = DatagramChannel.open(); channel.configureBlocking(false); channel.bind(new InetSocketAddress(8081)); System.out.println("waiting.....");
Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(32));
while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); DatagramChannel dc = (DatagramChannel) key.channel(); dc.receive(buffer); ByteBufferUtil.debugAll(buffer); } } } } }
|
client
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j public class UDPClient { public static void main(String[] args) throws IOException { DatagramChannel channel = DatagramChannel.open();
ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好"); int send = channel.send(buffer, new InetSocketAddress("localhost",8081));
log.debug("debug: {}", send); } }
|
NIO & BIO
stream vs channel
- stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
- stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
- 二者均为全双工,即读写可以同时进行
IO模型
io模型可以分为:
同步:
只用一个线程,该线程自己去获取结果
异步:
线程自己不去获取结果,由另外的线程获取结果,至少两个线程
当调用一次channel.read或stream.read后会切换到操作系统的内核态,来实现数据读取,分别为:
阻塞 IO
非阻塞 IO
多路复用
异步 IO
阻塞 IO vs 多路复用
零拷贝
传统IO问题
传统IO是将磁盘上的一个文件通过socket写出,会经过内核缓冲区、用户缓冲区、socket缓冲区再到网卡
1 2 3 4 5 6 7 8
| File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()]; file.read(buf);
Socket socket = ...; socket.getOutputStream().write(buf);
|
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
内部工作流程是这样的:
java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO
从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝
接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的
- 用户态与内核态的切换发生了 3 次,这个操作比较重量级
- 数据拷贝了共 4 次
NIO 优化
通过 DirectByteBuf
- ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
- ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存
大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用
- 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
- java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
- DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
- 通过专门线程访问引用队列,根据虚引用释放堆外内存
- 减少了一次数据拷贝,用户态与内核态的切换次数没有减少
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
- 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu
可以看到
- 只发生了一次用户态与内核态的切换
- 数据拷贝了 3 次
进一步优化(linux 2.4)
- java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
- 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
- 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu
整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有
- 更少的用户态与内核态的切换
- 不利用 cpu 计算,减少 cpu 缓存伪共享
- 零拷贝适合小文件传输
AIO
AIO 用来解决数据复制阶段的阻塞问题
- 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
- 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果
异步模型需要底层操作系统(Kernel)提供支持
- Windows 系统通过 IOCP 实现了真正的异步 IO
- Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势
文件AIO
网络AIO
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| package nio.c4;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset;
@Slf4j public class AioServer {
public static void main(String[] args) throws IOException { AsynchronousServerSocketChannel asc = AsynchronousServerSocketChannel.open(); asc.bind(new InetSocketAddress(8080)); asc.accept(null, new AcceptHandler(asc)); System.in.read(); }
private static void closeChannel(AsynchronousSocketChannel sc) { try { System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress()); sc.close(); } catch (IOException e) { e.printStackTrace(); } }
private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel sc;
public ReadHandler(AsynchronousSocketChannel sc) { this.sc = sc; }
@Override public void completed(Integer result, ByteBuffer attachment) { try { if (result == -1) { closeChannel(sc); return; } System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress()); attachment.flip(); System.out.println(Charset.defaultCharset().decode(attachment)); attachment.clear(); sc.read(attachment, attachment, this); } catch (IOException e) { e.printStackTrace(); } }
@Override public void failed(Throwable exc, ByteBuffer attachment) { closeChannel(sc); exc.printStackTrace(); } }
private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel sc;
private WriteHandler(AsynchronousSocketChannel sc) { this.sc = sc; }
@Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { sc.write(attachment); } }
@Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); closeChannel(sc); } }
private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { private final AsynchronousServerSocketChannel ssc;
public AcceptHandler(AsynchronousServerSocketChannel ssc) { this.ssc = ssc; }
@Override public void completed(AsynchronousSocketChannel sc, Object attachment) { try { System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress()); } catch (IOException e) { throw new RuntimeException(e); } ByteBuffer buffer = ByteBuffer.allocate(1); sc.read(buffer, buffer, new ReadHandler(sc)); sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc)); ssc.accept(null, this); }
@Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } } }
|