NIO基础

三大组件

Channel & Buffer

channel 是读写数据的双向通道,可以从channel将数据读入到buffer,也可以将buffer的数据写入到channel。

d79ad454-3e58-48d0-a13c-a94861d8b680

常见Channel

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer是数据的缓存区,常见的buffer有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

  1. 一开始服务器每开启一个socket连接,就分配一个线程进行处理。这种方式内存开销大,线程的上下文切换成本高,只适合连接数少的情况。

  2. 然后引入线程池来管理线程,使用线程池当中的线程来处理socket,这样可以实现线程的复用,但是一个线程还是只能处理一个socket连接,仅仅适合短连接的常见。

  3. 最后nio使用selector来实现多路复用,一个selector可以管理多个channel,一个selector绑定一个线程。selector的select()方法会将绑定的线程阻塞,防止该线程空跑占用cpu资源,当selector管理的channel发生读写事件时,selector就会将这些事件交给threadchu’l

ByteBuffer

使用FileChannel读取data.txt文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class TestFileChannel {
public static void main(String[] args) {
try (RandomAccessFile file = new RandomAccessFile(TestFileChannel.class.getClassLoader().getResource("data.txt").getFile(), "rw") ) {
FileChannel fileChannel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(5);
do {
int len = fileChannel.read(buffer);
log.info("len: {}", len);
if (len == -1)
break;
buffer.flip();
while (buffer.hasRemaining()) {
log.info("{}", (char) buffer.get());
}
buffer.clear();
} while (true);
} catch (Exception e) {
e.printStackTrace();
}
}
}

ByteBuffer的正确使用方法

  1. 通过channel读取数据存入ByteBuffer,channel.read(buf)
  2. 通过flip()将buffer切换为读模式
  3. 通过buffer.get() 读取数据
  4. 通过clear(), compact()将buffer切换到写模式

ByteBuffer结构

  • position
  • capacity
  • limit

e17622de-0ed8-463f-9972-33aa154fcbdd

7d66dd9a-88f7-48a5-8040-914e08c50eb2

3bf46e63-7bd2-4a76-977d-a2d754e78174

a293bcdc-ab02-4a3c-9fce-d3af33897657

ByteBuffer常用方法

分配空间allocate

1
Bytebuffer buf = ByteBuffer.allocate(16);

向ByteBuffer写数据

  1. 使用channel.read(buffer)
  2. buffer.put

从ByteBuffer读取数据

  1. channel.write(buf)
1
int writeBytes = channel.write(buf);
  1. buffer.get
1
byte b = buf.get();

get()方法会使position指针移动,使用rewind方法重置指针,或者使用get(index)方法不会移动指针

可以调用 rewind 方法将 position 重新置为 0

或者调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针

mark & reset

通过调用mark方法可以记录当前指针的位置,再次调用reset方法可以恢复position到之前标记的位置

filip 与 rewind 都会清除mark标记

字符长与ByteBuffer互相转换

字符串2buffer

1
2
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

buffer2字符长

1
2
3
CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

线程安全性

⚠bytebuffer是非线程安全的。

scattering read

分散读可以直接将数据读入多个bytebuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestScatteringReads {
public static void main(String[] args) {
String file = TestScatteringReads.class.getClassLoader().getResource("words.txt").getFile();
try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) {
FileChannel channel = accessFile.getChannel();
ByteBuffer buf1 = ByteBuffer.allocate(3);
ByteBuffer buf2 = ByteBuffer.allocate(3);
ByteBuffer buf3 = ByteBuffer.allocate(6);
long read = channel.read(new ByteBuffer[]{buf1, buf2, buf3});
log.info("read bytes {}", read);

buf1.flip();
buf2.flip();
buf3.flip();

ByteBufferUtil.debugRead(buf1);
ByteBufferUtil.debugRead(buf2);
ByteBufferUtil.debugRead(buf3);

} catch (IOException e) {
e.printStackTrace();
}
}
}

gathring writes

集中写,将多个bytebuffer的数据写入文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestGatheringWrites {
public static void main(String[] args) {
String file = TestScatteringReads.class.getClassLoader().getResource("3parts.txt").getFile();
System.out.println(file);
try (RandomAccessFile accessFile = new RandomAccessFile(file, "rw")) {
FileChannel channel = accessFile.getChannel();
ByteBuffer buf1 = ByteBuffer.allocate(4);
ByteBuffer buf2 = ByteBuffer.allocate(4);

buf1.put(buf2);
buf1.put(new byte[] {'a', 'b', 'c', 'd'});
buf2.put(new byte[] {'e', 'f', 'g', 'h'});

buf1.flip(); // 切换到读模式后才能成功的写入文件
// buf2.flip();
ByteBufferUtil.debugRead(buf1);

channel.write(new ByteBuffer[] {buf1, buf2});


} catch (IOException e) {
e.printStackTrace();
}
}
}

处理黏包和半包

通过TCP发送多条数据给服务端,假设发送下面三条数据

  • Hello,world\n
  • I’m zhangsan\n
  • How are you?\n

但是在发送过程中由于tcp协议、bytebuffer大小等原因会发生黏包和半包现象,上面的三条数据被组合成了下面两条数据

  • Hello,world\nI’m zhangsan\nHo
  • w are you?\n

利用分隔符的方式,可以将发送的数据重新解码成正常的三条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TestStikeyHalfPkg {
public static void main(String[] args) {
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
byteBuffer.put("Hello,world\nI'm zhangsan\nHo".getBytes()); // 黏包

split(byteBuffer);
byteBuffer.put("w are you?\nhaha!\n".getBytes());
split(byteBuffer);
}

public static void split(ByteBuffer buffer) {
buffer.flip();
ByteBufferUtil.debugAll(buffer);
int oldLimit = buffer.limit();
for (int i = 0; i < oldLimit; i++) {
byte b = buffer.get(i); // 不会改变buffer的 position 指针
if (b == '\n') {
System.out.println(buffer.position());
ByteBuffer target = ByteBuffer.allocate(i - buffer.position() + 1);
buffer.limit(i + 1); // 右开区间 limit取不到 , 读的时候 position = limit 时 就不会再读了
target.put(buffer);// 相当于读buffer position ++, 会改变position 指针
ByteBufferUtil.debugAll(target);
buffer.limit(oldLimit);// 还原limit 指针位置, 方便继续读
}
}
buffer.compact();// compact操作解决半包 现象, 未读的字节放到 buffer 首部
}
}

网络编程

阻塞与非阻塞

阻塞

  • 阻塞

    在阻塞模式下,所有的方法会阻塞当前线程继续运行,当前线程将会暂停。

    • ServerSocketChannel.accept() 方法会阻塞线程运行,只有新的连接建立后才会让线程继续运行
    • SocketChannel.read() 方法当读不到channel数据时也会阻塞

    阻塞的特点就是暂停当前线程,该线程不会占用cpu,该线程也不能去做其他事情

  • 单线程

    在单线程下阻塞方法之间相互影响,几乎不能正常运行,因此需要使用多线程

  • 多线程

    不同位数的jvm 每个线程占用的内存也不同, 32位的jvm 一个线程 320k, 64位的jvm一个线程 1024k,如果每个连接用一个线程处理 ,连接过多就会导致oom,而且线程过多,会因为频繁的上下文切换导致性能降低。

    引入线程池后虽然能减少线程的上下文切换,但是如果建立过多的长连接也会导致线程池当中的线程阻塞,因此只适合处理短链接

client

1
2
3
4
5
6
7
public class Client {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
boolean connected = socketChannel.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
}
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class Server {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(16);
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {

ssc.bind(new InetSocketAddress(8080));
boolean blocking = ssc.isBlocking();
System.out.println(blocking);
List<SocketChannel> channels = new ArrayList<>();
while (true) {
log.info("connecting.......");
SocketChannel channel = ssc.accept(); // 阻塞 线程停止运行
log.info("connected");
channels.add(channel);

for (SocketChannel socketChannel : channels) {
log.info("before read ...{}", socketChannel);
socketChannel.read(buffer);// 阻塞方法
buffer.flip();
ByteBufferUtil.debugAll(buffer);
buffer.clear();
log.debug("after read ... {}", socketChannel);
}
}
}

}
}

非阻塞

  • 非阻塞

    非阻塞模式下所有的方法都不会阻塞线程的运行

    • ServerSocketChannel.accept() 方法 无连接建立时返回 null,然后继续运行
    • SocketChannel.read() 方法读不到数据时返回0,继续运行
    • 写数据时,线程只等待将数据写入channel,无需channel通过网络将数据发送出去
  • 非阻塞模式下,由于无连接建立,以及无数据可读时线程还是在不断的运行,会白白的浪费cpu资源

  • 数据在复制的过程中仍然时阻塞的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class ServerNonBlocking {
public static void main(String[] args) throws Exception {
ByteBuffer buffer = ByteBuffer.allocate(16);
try (ServerSocketChannel ssc = ServerSocketChannel.open()) {

ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boolean blocking = ssc.isBlocking();
System.out.println(blocking);
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// log.info("connecting.......");
SocketChannel channel = ssc.accept(); // 非阻塞 没有建立连接时返回null
if (channel != null) {
log.info("connected");
channel.configureBlocking(false);
channels.add(channel);
}

for (SocketChannel socketChannel : channels) {
// log.info("before read ...{}", socketChannel);
int read = socketChannel.read(buffer);// 非阻塞 无 数据时 返回 0
if(read > 0) {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
log.debug("after read ... {}", socketChannel);
}
}
}
}
}
}

多路复用

单线程通过Selector可以防止非阻塞模式下线程一直运行,Selector可以监控多个Channel的读写事件

  • 多路复用只能用于网络IO,文件IO无法多路复用
  • 通过Selector可以实现事件触发时才唤醒线程进行处理
    • 有连接事件时去处理连接事件
    • 有读事件时去处理读事件
    • 有写事件时去处理写事件

优点

  • 事件触发,防止线程做无用功,白白占用cpu
  • 让一个线程充分的被利用
  • 节约线程的数量
  • 可以减少线程的上下文切换

Selector的创建

1
Selector selector = Selector.open()

Channel绑定Selector

1
2
socketchannel.configureBlocking(false);
socketchannel.register(selector, SelectionKeys.OPS_ACCEPT)
  • channel需使用非阻塞模式
  • fileChannel无非阻塞模式,所以不能和Selector一起使用
  • 可绑定的事件类型
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

Channel事件监听

1.阻塞直到事件发送

1
int count = selector.select()

2.阻塞到事件发送或者超时

1
int count = selector.select(long timeout)

3.不阻塞

1
int count = selector.selectNow()

Selector 不阻塞继续运行的情况

  • 事件发生
  • 调用selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

处理accept 和 read 事件

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class ServerSelector {
public static void main(String[] args) throws Exception {

try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select(); // 阻塞当前线程 等待注册channel的事件发生 count 时 注册的key的数量
log.debug("select count: {}", count);

// 获取select 注册的所有的 key
Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 事件必须处理否则下次还会触发
SocketChannel sc = c.accept();
log.debug("{}",sc);
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
try {
ByteBuffer buffer = ByteBuffer.allocate(128);
SocketChannel c = (SocketChannel) key.channel();
int read = c.read(buffer);
System.out.println(read);
if (read == -1) { // 客户端正常关闭时 返回 -1
log.debug("end of the stream key cancel");
key.cancel();
c.close();
} else {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
buffer.clear();
}
} catch (IOException e) {
// 防止客户端不正常关闭
key.cancel(); // 取消注册在 selector 上的事件 并且 将该key 从 key set 中移除
}
}
// 处理完毕必须将事件移除
iterator.remove();
}
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

注意

1.事件处理完成之后必须将该事件从 selectkeys set中移除,否则下次循环还会进入该事件的分支当中

​ 因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,仍然会进入key.isaccept的处理逻辑,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

2.事件发生后必须处理,如果不处理该事件会不断触发

3.调用Key.cancel后会取消注册在selector上的channel 并且 将key 从keyset中移除

处理边界问题

由于ByteBuffer的大小是固定的,而消息的长度可长可短,因此发送的消息在被bytebuffer接收时很有可能是不完整的,或者多条消息粘连在一起。

11081d4f-81fb-4dd3-9750-cbb07c3a60e4

因此需要对边界问题进行处理,有以下几种方式解决:

  • 客户端和服务器按照约定,按照固定长度收发数据,但是这样会浪费带宽
  • 另外一种方式就是约定消息的分隔符,按照分隔符进行分割,缺点是效率比较低
  • TLV格式,Type、Length、Value,一条消息包含类型和长度信息,类型和长度信息的大小可以先约定好,这样就可以很方便获取消息的大小,分配合适的buffer。缺点是buffer需要提前分配,影响server的吞吐量
    • HTTP 1.1 是TLV格式
    • HTTP 2.0 是LTV格式

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Slf4j
public class ServerSelector {
public static void main(String[] args) throws Exception {

try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
int count = selector.select(); // 阻塞当前线程 等待注册channel的事件发生 count 时 注册的key的数量
log.debug("select count: {}", count);

// 获取select 注册的所有的 key
Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 事件必须处理否则下次还会触发
SocketChannel sc = c.accept();
log.debug("{}",sc);
sc.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(16); // buffer 不能共用,所以一个channel分配一个buf
sc.register(selector, SelectionKey.OP_READ, buf);
} else if (key.isReadable()) {
try {
SocketChannel c = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = c.read(buffer);
if (read == -1) { // 客户端正常关闭时 返回 -1
log.debug("end of the stream key cancel");
key.cancel();
c.close();
} else {
split(buffer);// 按照/n分隔符打印数据, 如果发生数据截断,会再触发一次读事件,这时候需要对buffer扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newByteBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newByteBuffer.put(buffer);
key.attach(newByteBuffer);
}
}
} catch (IOException e) {
// 防止客户端不正常关闭
key.cancel(); // 取消注册在 selector 上的事件 并且 将该key 从 key set 中移除
}
}
// 处理完毕必须将事件移除
iterator.remove();
}
}

} catch (Exception e) {
e.printStackTrace();
}
}

public static void split(ByteBuffer buffer) {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
int oldLimit = buffer.limit();
for (int i = 0; i < oldLimit; i++) {
byte b = buffer.get(i); // 不会改变buffer的 position 指针
if (b == '\n') {
System.out.println(buffer.position());
ByteBuffer target = ByteBuffer.allocate(i - buffer.position() + 1);
buffer.limit(i + 1); // 右开区间 limit取不到 , 读的时候 position = limit 时 就不会再读了
target.put(buffer);// 相当于读buffer position ++, 会改变position 指针
ByteBufferUtil.debugAll(target);
buffer.limit(oldLimit);// 还原limit 指针位置, 方便继续读
}
}
buffer.compact();// compact操作解决半包 现象, 未读的字节放到 buffer 首部
}
}

Client

1
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));

初始分配了16个字节的bytebuffer,发送的消息超过这个大小是就会将buffer扩容,上面的代码示例中采用了 分隔符处理消息的形式

  • 注意点 bytebuffer的分配
    • 每个channel应该单独分配一个bytebuffer,bytebuffer 不能共用,因为每个channel的消息都有可能发生截断
    • bytebuffer不能太大,假如一个bytebuffer是1m 如果需要支持百万连接,内存就需要1Tb的内存
      • 可以先分配一个初始大小的buffer例如4k, 不够用的时候在分配一个新的buffer,cap翻一倍,优点是消息连续容易处理,缺点是会拷贝消耗性能
      • 也可以通过bytebuffer数组来实现,一个数组不够用就把多出来的内容写入新的buffer,但是这样消息不连续,解析很复杂,优点是避免了拷贝,降低了性能损耗

处理write事件

底层的socket缓冲区是有一定的大小的,buffer中的数据可能不能一次写入到channel中,而只要向channel中写数据就会触发write事件,因此只有当buffer中数据还有剩余是才需要监听write事件,当数据写完时再取消write事件的监听

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Slf4j
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
selector.select();

Set<SelectionKey> keys = selector.selectedKeys();

Iterator<SelectionKey> iterator = keys.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();

if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
ByteBuffer buffer = Charset.defaultCharset().encode("1234");
int write = sc.write(buffer);
log.debug("actual write bytes num {}", write);
if (buffer.hasRemaining()) {
sc.register(selector, SelectionKey.OP_WRITE, buffer);
}
} else if (key.isWritable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int write = sc.write(buffer);
log.debug("actual write bytes num {}", write);
if (!buffer.hasRemaining()) {
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
iterator.remove();
}

}

}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class WriteServerClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();

socketChannel.register(selector, (SelectionKey.OP_CONNECT | SelectionKey.OP_READ));

socketChannel.connect(new InetSocketAddress("localhost", 8080));

int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()) {
log.debug("connected");
System.out.println(socketChannel.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += socketChannel.read(buffer);
buffer.clear();
System.out.println(count);
}
}

}

}
}

多线程优化

通过多线程可以对代码进一步优化,使用一个线程作为boss线程专门处理accept事件,使用与cpu核心数相同的worker线程轮流处理read事件

e6c99379-9d1f-4f33-9901-4dad75c1af46

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Slf4j
public class ServerSelectorWithMutiThread {
public static void main(String[] args) throws Exception {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
WorkerEventLoop[] workers = new WorkerEventLoop[2];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerEventLoop(i);
}
AtomicInteger cnt = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected {}", sc);
workers[cnt.getAndIncrement() % workers.length].register(sc);
}
}
}
}

public static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;

private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public WorkerEventLoop(int index) {
this.index = index;
}

public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start =true;
}
queue.add(()-> {
try {
log.debug("sc.register select");
sc.register(worker, SelectionKey.OP_READ, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
worker.wakeup();
}

@Override
public void run() {
while (true) {
try {
log.debug("worker.select()");
worker.select();
Runnable task = queue.poll();
if (task != null) {
task.run();
}
Iterator<SelectionKey> iterator = worker.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.info("channel {}", channel.getRemoteAddress());
int read = channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}

对上面代码的boss 进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package nio.c2;

import lombok.extern.slf4j.Slf4j;
import util.ByteBufferUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class ServerSelectorWithMutiThread {
public static void main(String[] args) throws Exception {
BossEventLoop bossEventLoop = new BossEventLoop();
bossEventLoop.register();
}

public static class BossEventLoop implements Runnable {
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
private AtomicInteger cnt;

public void register() throws IOException {
if (!start) {
cnt = new AtomicInteger();
// 初始化server socket channel
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// channel绑定socket
boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
// channel 绑定端口
ssc.bind(new InetSocketAddress(8080));
// 初始化worker
workers = new WorkerEventLoop[2];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerEventLoop(i);
}
// 启动boss线程
Thread bossThread = new Thread(this, "boss");
// bossThread.setDaemon(true); // 守护线程会在主线程结束后同样结束
bossThread.start();
start = true;
}
}

@Override
public void run() {
while (true) {
try {
log.debug(" is daemon {}", Thread.currentThread().isDaemon());
boss.select();
Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected {}", sc);
workers[cnt.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

public static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;

private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
public WorkerEventLoop(int index) {
this.index = index;
}

public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start =true;
}
// 执行这个方法的线程是 boss 线程 在boss线程中操作 sc.resigster 绑定 woker的selector会有问题,
// 这里将绑定woker selector的操作利用队列放入到worker线程中进行
queue.add(()-> {
try {
log.debug("sc.register select");
sc.register(worker, SelectionKey.OP_READ, null);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
worker.wakeup();// worker线程启动后就会在select()方法处阻塞, 为了将channel绑定到worker的selecter 需要 调用wakeup方法
}

@Override
public void run() {
while (true) {
try {
log.debug("worker.select()");
worker.select();
Runnable task = queue.poll();
if (task != null) {
task.run();
}
Iterator<SelectionKey> iterator = worker.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.info("channel {}", channel.getRemoteAddress());
int read = channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}

获取cpu的核数

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

UDP

  • UDP是无连接的协议,client发送数据不需要管Server是否开启
  • Server端的receive方法会将接收到的数据存入bytebuffer,但是如果数据报文超过bytebuffer的大小就会将多出来的数据抛弃

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
public class UDPServer {
public static void main(String[] args) throws IOException {
// 开启udp channel
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(8081));
System.out.println("waiting.....");

// 绑定Selector
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(32));

while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
DatagramChannel dc = (DatagramChannel) key.channel();
dc.receive(buffer);
ByteBufferUtil.debugAll(buffer);
}
}
}
}
}

client

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
public class UDPClient {
public static void main(String[] args) throws IOException {
DatagramChannel channel = DatagramChannel.open();

ByteBuffer buffer = StandardCharsets.UTF_8.encode("你好");
int send = channel.send(buffer, new InetSocketAddress("localhost",8081));

log.debug("debug: {}", send);
}
}

NIO & BIO

stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO模型

io模型可以分为:

  • 同步阻塞
  • 同步非阻塞
  • 同步多路复用
  • 异步非阻塞

同步:

只用一个线程,该线程自己去获取结果

异步:

线程自己不去获取结果,由另外的线程获取结果,至少两个线程

当调用一次channel.read或stream.read后会切换到操作系统的内核态,来实现数据读取,分别为:

  • 等待数据阶段
  • 复制数据阶段

  • 阻塞 IO

  • 非阻塞 IO

  • 多路复用

  • 异步 IO

  • 阻塞 IO vs 多路复用

零拷贝

传统IO问题

传统IO是将磁盘上的一个文件通过socket写出,会经过内核缓冲区、用户缓冲区、socket缓冲区再到网卡

1
2
3
4
5
6
7
8
File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);
  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

内部工作流程是这样的:

3fa95401-d36e-47a4-968e-ea43c81f0196

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过 DirectByteBuf

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

1eaad5ab-250d-4677-ad62-517647e68dd3

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用 transferTo/transferFrom 方法拷贝数据

fdbad514-da93-470a-b36c-f4446c506bbc

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

28a9b6d1-32fc-447f-8576-ce11b8c3a1ec

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件AIO

1

网络AIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package nio.c4;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;

@Slf4j
public class AioServer {

public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel asc = AsynchronousServerSocketChannel.open();
asc.bind(new InetSocketAddress(8080));
asc.accept(null, new AcceptHandler(asc));
System.in.read();
}

private static void closeChannel(AsynchronousSocketChannel sc) {
try {
System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

public ReadHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
try {
if (result == -1) {
closeChannel(sc);
return;
}
System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
attachment.flip();
System.out.println(Charset.defaultCharset().decode(attachment));
attachment.clear();
// 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
sc.read(attachment, attachment, this);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
closeChannel(sc);
exc.printStackTrace();
}
}

private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel sc;

private WriteHandler(AsynchronousSocketChannel sc) {
this.sc = sc;
}

@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
if (attachment.hasRemaining()) {
sc.write(attachment);
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
closeChannel(sc);
}
}

private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
private final AsynchronousServerSocketChannel ssc;

public AcceptHandler(AsynchronousServerSocketChannel ssc) {
this.ssc = ssc;
}

@Override
public void completed(AsynchronousSocketChannel sc, Object attachment) {
try {
System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
} catch (IOException e) {
throw new RuntimeException(e);
}
ByteBuffer buffer = ByteBuffer.allocate(1);
// 读事件由 ReadHandler 处理
sc.read(buffer, buffer, new ReadHandler(sc));
// 写事件由 WriteHandler 处理
sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
// 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
ssc.accept(null, this);
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
}