Java NIO(New IO)是从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API。NIO 与原来的 IO 有同样的作用和目的,但是使用方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。

1.基本介绍

在1.4版本之前,JavaIO类库是阻塞式IO;从1.4版本开始,引进了新的异步IO库,被称为Java New IO类库,简称为Java NIO。

Java NIO类库的目标,就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO),称“老的”阻塞式Java IO为OIO(Old IO)。总体上说,NIO弥补了原来面向流的OIO同步阻塞的不足,它为标准Java代码提供了高速的、面向缓冲区的IO。

Java NIO类库包含以下三个核心组件:

Channel(通道)

Buffer(缓冲区)

Selector(选择器)

Java NIO,属于第三种模型—— IO 多路复用模型。只不过,JavaNIO组件提供了统一的应用开发API,为大家屏蔽了底层的操作系统的差异。

NIO和OIO的对比

在Java中,NIO和OIO的区别,主要体现在三个方面:

(1)OIO是面向流(Stream Oriented)的,NIO是面向缓冲区(Buffer Oriented)的。

问题是:什么是面向流,什么是面向缓冲区呢?

在面向流的OIO操作中,IO的 read() 操作总是以流式的方式顺序地从一个流(Stream)中读取一个或多个字节,因此,我们不能随意地改变读取指针的位置,也不能前后移动流中的数据。

而NIO中引入了Channel(通道)和Buffer(缓冲区)的概念。面向缓冲区的读取和写入,都是与Buffer进行交互。用户程序只需要从通道中读取数据到缓冲区中,或将数据从缓冲区中写入到通道中。NIO不像OIO那样是顺序操作,可以随意地读取Buffer中任意位置的数据,可以随意修改Buffer中任意位置的数据。

(2)OIO的操作是阻塞的,而NIO的操作是非阻塞的。

OIO的操作是阻塞的,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。例如,我们调用一个read方法读取一个文件的内容,那么调用read的线程会被阻塞住,直到read操作完成。

NIO如何做到非阻塞的呢?当我们调用read方法时,系统底层已经把数据准备好了,应用程序只需要从通道把数据复制到Buffer(缓冲区)就行;如果没有数据,当前线程可以去干别的事情,不需要进行阻塞等待。

NIO的非阻塞是如何做到的呢?

其实在上一章,答案已经揭晓了,根本原因是:NIO使用了通道和通道的IO多路复用技术。

(3)OIO没有选择器(Selector)概念,而NIO有选择器的概念。

NIO技术的实现,是基于底层的IO多路复用技术实现的,比如在Windows中需要select多路复用组件的支持,在Linux系统中需要select/poll/epoll多路复用组件的支持。所以NIO的需要底层操作系统提供支持。而OIO不需要用到选择器。

通道(Channel)
前面提到,Java NIO类库包含以下三个核心组件:

Channel(通道)

Buffer(缓冲区)

Selector(选择器)

首先说一下Channel,国内大多翻译成“通道”。Channel的角色和OIO中的Stream(流)是差不多的。在OIO中,同一个网络连接会关联到两个流:一个输入流(Input Stream),另一个输出流(Output Stream),Java应用程序通过这两个流,不断地进行输入和输出的操作。

在NIO中,一个网络连接使用一个通道表示,所有的NIO的IO操作都是通过连接通道完成的。一个通道类似于OIO中的两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。

Channel和Stream的一个显著的不同是:Stream是单向的,譬如InputStream是单向的只读流,OutputStream是单向的只写流;而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

NIO中的Channel的主要实现有:

1.FileChannel 用于文件IO操作

2.DatagramChannel 用于UDP的IO操作

3.SocketChannel 用于TCP的传输操作

4.ServerSocketChannel 用于TCP连接监听操作

选择器(Selector)
首先,回顾一个前面介绍的基础知识,什么是IO多路复用模型?

IO多路复用指的是一个进程/线程可以同时监视多个文件描述符(含socket连接),一旦其中的一个或者多个文件描述符可读或者可写,该监听进程/线程能够进行IO事件的查询。

在Java应用层面,如何实现对多个文件描述符的监视呢?

需要用到一个非常重要的Java NIO组件——Selector 选择器。Selector选择器可以理解为一个IO事件的监听与查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。

2.byteBuffer

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.110.Final</version>
</dependency>

1.bytebuffer的基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args){
//FileChannel
//1.输入输出流
try (FileChannel channel = new FileInputStream("data.txt").getChannel()){
ByteBuffer buffer = ByteBuffer.allocate(512);

//从channel导入并且向buffer导入
channel.read(buffer);
//打印出buffer的内容
buffer.flip(); //切换到读模式
ArrayList<Byte> bytes =new ArrayList<Byte>();
while (buffer.hasRemaining()){
byte b = buffer.get();
bytes.add(b);
}
System.out.println(bytes);
//切换到写模式
buffer.clear();
}catch (Exception e){
e.printStackTrace();
}
}

2.ByteBuffer的结构

capacity 容量

position 读写的位置

limit 读写的限制

compact方法 切换至写模式

1
2
3
4
public static void main(String[] args){
System.out.println(ByteBuffer.allocate(16).getClass());
System.out.println(ByteBuffer.allocateDirect(16).getClass());
}

class java.nio.HeapByteBuffer 堆内存 读写效率较低,收到GC的影响
class java.nio.DirectByteBuffer 直接内存 读写效率较高(少一层数据的拷贝),不受GC的影响

3.读取方法

buffer.get() 读取

rewind() 从头开始读

mark 做一个标记,记录position的位置,reset是将position重置到mark的位置

get(i) 按索引查找

4.字符串转换

“”.getBytes()

1
2
ByteBuffer buffer = ByteBuffer.allocate(16);
ByteBuffer bytes = buffer.put("hello".getBytes());

ByteBuffer.warp()

1
ByteBuffer buffer1 = ByteBuffer.wrap("hello".getBytes());

将buffer转为字符串

1
2
3
4
5
ByteBuffer buffer1 = ByteBuffer.wrap("hello".getBytes());
//buffer1.flip();

String decode = StandardCharsets.UTF_8.decode(buffer1).toString();
System.out.println(decode);

5.Scattering Reads

分散读取

1
2
3
4
5
6
ByteBuffer b1 = ByteBuffer.allocate(4);
ByteBuffer b2 = ByteBuffer.allocate(4);
ByteBuffer b3 = ByteBuffer.allocate(6);
b1.flip();
b2.flip();
b3.flip();

6.处理粘包半包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
ByteBuffer source = ByteBuffer.allocate(32);
source.put("Hello,world\nI'm Zhangsan\nHo".getBytes());
split(source);
source.put("w are you?\n".getBytes());
split(source);
}

public static void split(ByteBuffer source){
source.flip();
for (int i=0;i<source.limit();i++){
if (source.get(i) == '\n'){
int len = i + 1 - source.position();
ByteBuffer target = ByteBuffer.allocate(len);

for (int j = 0;j<len;j++){
target.put(source.get());
}
}
}
source.compact();
}

3.FileChannel

FileChannel只工作在阻塞模式下

1.transferTo

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
try (
FileChannel from = new FileInputStream("data.txt").getChannel();
FileChannel to = new FileOutputStream("data1.txt").getChannel();
){
from.transferTo(0,4096,to);
}catch (Exception e){
e.printStackTrace();
}
}

如果传输数据大于2g 使用from.size()获取剩余的大小

2.Path

Path和Paths类 JDK7引入

1.walkFileTree

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws IOException {
Files.walkFileTree(Path.of("D:\\BaiduNetdiskDownload"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file);
return super.visitFile(file, attrs);
}

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
System.out.println(dir);
return super.preVisitDirectory(dir, attrs);
}
});
}

2.删除多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws IOException {
Files.walkFileTree(Path.of("D:\\BaiduNetdiskDownload"),new SimpleFileVisitor<Path>(){
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return super.postVisitDirectory(dir, exc);
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return super.visitFile(file, attrs);
}
});
}

3.拷贝多级目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String from = "D:\\BaiduNetdiskDownload";
String to = "D:\\BaiduNetdiskDownload";

Files.walk(Paths.get(from)).forEach(path -> {
try {
String targetName = path.toString().replace(from,to);
if (Files.isDirectory(path)){
Files.createDirectory(Paths.get(targetName));
}else {
Files.copy(path, Paths.get(targetName));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});

4.网络编程

1.阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TestNetServer {

public static void main(String[] args) throws IOException {
//创建服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ByteBuffer buffer = ByteBuffer.allocate(10);
//绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));

List<SocketChannel> channels = new ArrayList<>();
while (true) {
//accept 建立连接,进行通信
SocketChannel accept = serverSocketChannel.accept(); //阻塞方法
channels.add(accept);

for (SocketChannel sc : channels){
sc.read(buffer);
buffer.flip();

while(buffer.hasRemaining()){
System.out.println(buffer.get());
}

buffer.clear();
}
}
}
}
1
2
3
4
5
6
7
8
public class TestNetClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",8080));
socketChannel.write(Charset.defaultCharset().encode("test"));
System.out.println("waiting");
}
}

2.非阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws IOException {
//创建服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ByteBuffer buffer = ByteBuffer.allocate(10);
//绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false); //非阻塞模式
List<SocketChannel> channels = new ArrayList<>();
while (true) {
//accept 建立连接,进行通信
SocketChannel accept = serverSocketChannel.accept(); //非阻塞

accept.configureBlocking(false); //配置为非阻塞
channels.add(accept); //非阻塞

for (SocketChannel sc : channels){
sc.read(buffer);
buffer.flip();

while(buffer.hasRemaining()){
System.out.println(buffer.get());
}

buffer.clear();
}
}
}

3.selector

事件的类型

  • accept 会在有链接请求时触发
  • connect 是客户端,连接建立后触发
  • read 可读事件
  • write 可写事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static void main(String[] args) throws IOException {
//1. 创建Selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); //非阻塞模式

//2. SelectionKey事件发生后,通过它可以知道事件和哪个channel发生了事件
SelectionKey Skey = serverSocketChannel.register(selector, 0, null);
//只关注accept事件
Skey.interestOps(SelectionKey.OP_ACCEPT);
//绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));
while (true) {
//3. selector方法 没有事件发生会阻塞
selector.select();
//4 处理事件 内部包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()){
SelectionKey key = iterator.next();

//区分事件类型
if (key.isAcceptable()){
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey SSkey = serverSocketChannel.register(selector, 0, null);
SSkey.interestOps(SelectionKey.OP_READ);
}else if (key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
while(buffer.hasRemaining()){
System.out.println(buffer.get());
}
}
}
}
}

处理空指针异常问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) throws IOException {
// 1. 创建Selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 非阻塞模式

// 2. 绑定监听端口
serverSocketChannel.bind(new InetSocketAddress(8080));

// 3. 注册通道到selector
SelectionKey Skey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
// 4. selector方法 没有事件发生会阻塞
selector.select();
// 5. 处理事件 内部包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// 6. 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
// 7. 注册新连接的SocketChannel
SelectionKey SSkey = sc.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024); // 调整缓冲区大小
int bytesRead = channel.read(buffer);
if (bytesRead == -1) {
channel.close(); // 读取完毕,关闭通道
continue;
}
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
buffer.clear(); // 清空缓冲区,准备下一次读取
}
}
}
}

4.处理消息边界

LTV格式,Type类型 Length长度 Value数据 在类型和长度已知的情况下方便获取大小

http 2.0 是LTV格式

5.处理写消息过多的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()){
SocketChannel accept = ssc.accept();
accept.configureBlocking(false);

StringBuilder stringBuilder = new StringBuilder();
for (int i=0;i<1000000;i++){
stringBuilder.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString());
while (buffer.hasRemaining()){
int write = accept.write(buffer);
System.out.println(write);
}
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",8080));

int count = 0;
while (true){
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += socketChannel.read(buffer);
System.out.println(count);
buffer.clear();
}
}

输出

1
2
3
4
5
6
7
8
131071
262142
393213
524284
655355
786426
917497
1000000

处理消息过多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

Selector selector = Selector.open();

ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true){
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()){
SocketChannel accept = ssc.accept();
accept.configureBlocking(false);
SelectionKey scKey = accept.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);

ByteBuffer buffer = Charset.defaultCharset().encode("a".repeat(100000000));

int write = accept.write(buffer);
System.out.println(write);

if (buffer.hasRemaining()){
//关注可写事件
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
//挂载
scKey.attach(buffer);
}
}else if (key.isReadable()) {
ByteBuffer buffer1 = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();

int write = sc.write(buffer1);
System.out.println(write);

//清理操作
if (!buffer1.hasRemaining()){
key.attach(null);
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}

6.IO模型

阻塞IO

非阻塞IO

多路复用

信号驱动

异步IO

1.AIO

异步IO

在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws IOException {
try(AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(
Paths.get("data.txt"), StandardOpenOption.READ)){
ByteBuffer buffer = ByteBuffer.allocate(16);

asynchronousFileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
while (attachment.hasRemaining()){
System.out.println((char)attachment.get());
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}catch (Exception e){
e.printStackTrace();
}
System.in.read();
}

5.Netty

1.Netty入门

netty是一个异步的、基于事件驱动的网络应用框架,用于开发可维护、高性能的网络服务和客户端

用服务器端向客户端发送Hello

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) {
//启动器,负责组装netty组件
new ServerBootstrap().group(new NioEventLoopGroup())
//选择服务器的ServerSocket的实现
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//配置具体handler
ch.pipeline().addLast(new StringDecoder()); //将buffer转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override //读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
}).bind(8080); //bind为绑定的监听端口
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
}).connect("localhost",8080)
.sync() //阻塞方法,直到连接建立
.channel() //代表连接对象
.writeAndFlush("Hello"); //发送数据
}

2.组件

1.EventLoop

事件循环对象,本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断地IO事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup(2); //io事件,普通任务,定时任务

EventLoopGroup group1 = new DefaultEventLoop(); //普通任务、定时任务
//获取下一个事件循环对象
//EventLoop next = group.next();
//执行普通任务
group.next().submit(()->{
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("ok");
});
System.out.println("main");
}
1
2
3
4
5
//执行定时任务
group.next().scheduleAtFixedRate(()->{
System.out.println("ok");
},0,1, TimeUnit.SECONDS);
System.out.println("main");

eventloop分工细化

1
2
3
new ServerBootstrap()
// boss负责accept 和 worker负责SocketChannel reade和write
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))

进一步细分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//创建一个独立的EventLoopGroup  处理耗时较长的handler
EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(2);
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//将耗时较长的处理交给独立的EventLoopGroup
ch.pipeline().addLast(eventLoopGroup,"handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); //将消息传给下一个handler
}
});
}
}).bind(8080);

2.Channel

channel

  • close() 关闭channel
  • closeFuture() 用来处理channel的关闭
    • sync 方法的作用是同步等待channel的关闭
    • addListener方法是异步等待channel的关闭
  • pipline() 方法添加处理器
  • write() 方法写入数据
  • writeAndFlush() 方法将数据写入并且刷出

channelFuture连接问题

1
2
3
4
5
6
7
8
9
10
11
12
13
ChannelFuture future = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
// connect方法是一个异步非阻塞的方法
// main发起了调用 真正执行的是nio线程
}).connect("localhost", 8080);
//无阻塞向下执行
future.channel().writeAndFlush("Hello");; //代表连接对象 & 发送数据

channelFuture处理结果的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//带有Future Promise的类型都是异步方法配套使用,用来处理结果
ChannelFuture future = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
// connect方法是一个异步非阻塞的方法
// main发起了调用 真正执行的是nio线程
}).connect("localhost", 8080);
//方法一:使用sync同步处理结果
/* future.sync(); //阻塞方法,直到连接建立
//无阻塞向下执行
future.channel().writeAndFlush("Hello");; //代表连接对象 & 发送数据*/

// 方法二:使用addListener异步处理结果
future.addListener(new ChannelFutureListener() {
@Override
//在 nio线程建立好连接后 执行operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
future.channel().writeAndFlush("Hello");
}
});

channelFuture关闭问题

问题:channel.close是异步的,因此无法在关闭后执行关闭后的操作,可能channel会在一秒钟之后才释放

1
2
3
4
5
6
7
8
9
10
11
12
Channel channel = future.sync().channel();
new Thread(()->{
Scanner sc = new Scanner(System.in);
while (true){
String line = sc.nextLine();
if (line.equals("q")){
channel.close();
break;
}
channel.writeAndFlush(line);
}
},"input").start();

正确处理channelFuture关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
// 方法一 :获取closeFuture对象 同步处理关闭 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("等待关闭");
closeFuture.sync();
System.out.println("处理关闭后的操作");

//方法二 : 使用addListener
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("处理关闭后的操作");
}
});

当我们输入q时发现总java线程没有结束,处理该问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//带有Future Promise的类型都是异步方法配套使用,用来处理结果
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture future = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
// connect方法是一个异步非阻塞的方法
// main发起了调用 真正执行的是nio线程
}).connect("localhost", 8080);
/* //方法一:使用sync同步处理结果
future.sync(); //阻塞方法,直到连接建立
//无阻塞向下执行
future.channel().writeAndFlush("Hello");; //代表连接对象 & 发送数据*/

/*// 方法二:使用addListener异步处理结果
future.addListener(new ChannelFutureListener() {
@Override
//在 nio线程建立好连接后 执行operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
future.channel().writeAndFlush("Hello");
}
});*/

Channel channel = future.sync().channel();
new Thread(()->{
Scanner sc = new Scanner(System.in);
while (true){
String line = sc.nextLine();
if (line.equals("q")){
channel.close();
break;
}
channel.writeAndFlush(line);
}
},"input").start();

/*// 方法一 :获取closeFuture对象 同步处理关闭 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("等待关闭");
closeFuture.sync();
System.out.println("处理关闭后的操作");
*/
//方法二 : 使用addListener
ChannelFuture closeFuture = channel.closeFuture();

//lamda
closeFuture.addListener((ChannelFutureListener) ChannelFuture-> {
System.out.println("处理关闭后的操作");
group.shutdownGracefully();
});
//普通模式
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("处理关闭后的操作");
group.shutdownGracefully();
}
});

3.Future & Promise

jdk-future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws ExecutionException, InterruptedException {
//线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 50;
}
});
//主线程通过future获取结果 get阻塞等待
System.out.println(future.get());
}

netty-future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();

EventLoop eventLoop = group.next();
//提交任务
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 50;
}
});
//获取结果
System.out.println(future.get());
}

异步获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();

EventLoop eventLoop = group.next();
//提交任务
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 50;
}
});
//异步获取结果(非阻塞)
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(future.getNow());
}
});
}

netty-promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建promise对象 promise==>结果的容器
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(()->{
System.out.println("开始");
try{
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
promise.setSuccess(999);
}).start();
//同步阻塞
System.out.println(promise.get());
System.out.println("结束");
}

传递错误的结果

1
2
3
4
5
6
7
8
9
10
new Thread(()->{
System.out.println("开始");
try{
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
promise.setFailure(e);
}
promise.setSuccess(999);
}).start();

4.Handler & Pipline

ChannelHandler用来处理Channel的各种事件

将消息传递给下一个handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
});

模拟出站入站

1
2
3
4
5
6
EmbeddedChannel channel = new EmbeddedChannel();

//模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("Hello".getBytes()));
//模拟出站
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hi".getBytes()));

5.ByteBuf

1.创建
1
2
3
4
5
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
String s = "a".repeat(300);
buf.writeBytes(s.getBytes());
System.out.println(buf);

输出结果

1
2
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
2.直接内存和堆内存

堆内存

1
ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer();

直接内存(默认)

1
ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer();
3.池化和非池化

可以重用Buf

1
2
3
System.out.println(buf1.getClass());

//---》class io.netty.buffer.PooledUnsafeHeapByteBuf
4.读取

buf.readByte() 读取一个字节

1
2
3
4
5
6
7
8
9
10
//合并两个byteBuf0
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{2,3,4,5,6,7});


ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
buf1.writeBytes(new byte[]{2,3,4,5,6,7});

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20);
buffer.writeBytes(buf).writeBytes(buf1);

优化 CompositeByteBuf 避免了重复的复制

1
2
3
4
5
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{2,3,4,5,6,7});

CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer.addComponents(true,buf, buf1);

6.Netty进阶

1.粘包和半包问题处理措施

1.定长解码器

1
2
3
4
5
6
7
8
9
10
11
new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//添加定长解码器
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
}).connect("localhost", 8080);

2.LineBasedFrameDecoder

1
2
3
4
5
6
7
8
9
10
11
new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//添加定长解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
}).connect("localhost", 8080);

3.LengthFieldBaseFrameDecoder

lengthFieldOffset 长度字段偏移量

lengthFiledLength 长度字段长度

lengthAdjustment 长度字段为基准,还有几个字节是内容

initialBytesToStrip 从头剥离几个字节

2.协议的设计和解析

http协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(HttpServer.class);
NioEventLoopGroup worker = new NioEventLoopGroup();
NioEventLoopGroup boss = new NioEventLoopGroup();

try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(boss,worker);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest){

}else if (msg instanceof HttpContent){

}
}
});
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
logger.info("Server started at http://127.0.0.1:{}", 8080);
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}

根据消息的类型区分SimpleChannelInboundHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(boss,worker);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
//获取请求
logger.info(msg.uri());
//返回响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
response.content().writeBytes("<h1>test<h1/>".getBytes());

//写回响应
ctx.writeAndFlush(response);
}
});
}
});

输出结果 [nioEventLoopGroup-2-1] INFO com.sun.net.httpserver.HttpServer - /index.html

1
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,128);

3.自定义协议

魔数 第一时间判定是否是有效的数据包

版本号 可以支持协议的升级

序列化算法 消息正文

指令类型 登录、注册、群聊、私聊

请求序号 为了双工通信、提供异步能力

正文长度

消息正文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Decoder extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//魔数
out.writeBytes(new byte[]{1,2,3,4});
//字节版本
out.writeByte(1);
//字节的序列化方式 0jdk 1json
out.writeByte(0);
// 字节的指令类型
out.writeByte(2);
//序列号
out.writeByte(5);
//无意义 用来对齐
out.writeByte(0xff);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream obs = new ObjectOutputStream(bos);
obs.writeObject(msg);
byte[] byteArray = bos.toByteArray();
//正文长度
out.writeByte(byteArray.length);
//正文
out.writeBytes(byteArray);
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

}
}

定义解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Decoder extends ByteToMessageCodec<Message> {
final Logger logger = LoggerFactory.getLogger(Decoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//魔数
out.writeBytes(new byte[]{1,2,3,4});
//字节版本
out.writeByte(1);
//字节的序列化方式 0jdk 1json
out.writeByte(0);
// 字节的指令类型
out.writeByte(2);
//序列号
out.writeByte(5);
//无意义 用来对齐
out.writeByte(0xff);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream obs = new ObjectOutputStream(bos);
obs.writeObject(msg);
byte[] byteArray = bos.toByteArray();
//正文长度
out.writeByte(byteArray.length);
//正文
out.writeBytes(byteArray);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
int magicMsg = in.readInt();
byte version = in.readByte();
byte serializeType = in.readByte();

int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.writeBytes(bytes,0,length);
//假设为jdk
if (serializeType == 0){
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Message msg = (Message)ois.readObject();
logger.info(msg.toString());
list.add(msg);
}
}
}

7.聊天业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package richuff.top;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChatServer{
static Logger logger = LoggerFactory.getLogger(ChatServer.class);
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(boss,worker);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,0,0,0));

}
});
bootstrap.bind(8080).sync().channel();
}catch (InterruptedException e){
logger.info(String.valueOf(e));
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}