Java NIO(New IO)是从 Java 1.4 版本开始引入的一个新的 IO API,可以替代标准的 Java IO API。NIO 与原来的 IO 有同样的作用和目的,但是使用方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。
1.基本介绍 在1.4版本之前,JavaIO类库是阻塞式IO;从1.4版本开始,引进了新的异步IO库,被称为Java New IO类库,简称为Java NIO。
Java NIO类库的目标,就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO),称“老的”阻塞式Java IO为OIO(Old IO)。总体上说,NIO弥补了原来面向流的OIO同步阻塞的不足,它为标准Java代码提供了高速的、面向缓冲区的IO。
Java NIO类库包含以下三个核心组件:
Channel(通道)
Buffer(缓冲区)
Selector(选择器)
Java NIO,属于第三种模型—— IO 多路复用模型。只不过,JavaNIO组件提供了统一的应用开发API,为大家屏蔽了底层的操作系统的差异。
NIO和OIO的对比
在Java中,NIO和OIO的区别,主要体现在三个方面:
(1)OIO是面向流(Stream Oriented)的,NIO是面向缓冲区(Buffer Oriented)的。
问题是:什么是面向流,什么是面向缓冲区呢?
在面向流的OIO操作中,IO的 read() 操作总是以流式的方式顺序地从一个流(Stream)中读取一个或多个字节,因此,我们不能随意地改变读取指针的位置,也不能前后移动流中的数据。
而NIO中引入了Channel(通道)和Buffer(缓冲区)的概念。面向缓冲区的读取和写入,都是与Buffer进行交互。用户程序只需要从通道中读取数据到缓冲区中,或将数据从缓冲区中写入到通道中。NIO不像OIO那样是顺序操作,可以随意地读取Buffer中任意位置的数据,可以随意修改Buffer中任意位置的数据。
(2)OIO的操作是阻塞的,而NIO的操作是非阻塞的。
OIO的操作是阻塞的,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。例如,我们调用一个read方法读取一个文件的内容,那么调用read的线程会被阻塞住,直到read操作完成。
NIO如何做到非阻塞的呢?当我们调用read方法时,系统底层已经把数据准备好了,应用程序只需要从通道把数据复制到Buffer(缓冲区)就行;如果没有数据,当前线程可以去干别的事情,不需要进行阻塞等待。
NIO的非阻塞是如何做到的呢?
其实在上一章,答案已经揭晓了,根本原因是:NIO使用了通道和通道的IO多路复用技术。
(3)OIO没有选择器(Selector)概念,而NIO有选择器的概念。
NIO技术的实现,是基于底层的IO多路复用技术实现的,比如在Windows中需要select多路复用组件的支持,在Linux系统中需要select/poll/epoll多路复用组件的支持。所以NIO的需要底层操作系统提供支持。而OIO不需要用到选择器。
通道(Channel) 前面提到,Java NIO类库包含以下三个核心组件:
Channel(通道)
Buffer(缓冲区)
Selector(选择器)
首先说一下Channel,国内大多翻译成“通道”。Channel的角色和OIO中的Stream(流)是差不多的。在OIO中,同一个网络连接会关联到两个流:一个输入流(Input Stream),另一个输出流(Output Stream),Java应用程序通过这两个流,不断地进行输入和输出的操作。
在NIO中,一个网络连接使用一个通道表示,所有的NIO的IO操作都是通过连接通道完成的。一个通道类似于OIO中的两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。
Channel和Stream的一个显著的不同是:Stream是单向的,譬如InputStream是单向的只读流,OutputStream是单向的只写流;而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel的主要实现有:
1.FileChannel 用于文件IO操作
2.DatagramChannel 用于UDP的IO操作
3.SocketChannel 用于TCP的传输操作
4.ServerSocketChannel 用于TCP连接监听操作
选择器(Selector) 首先,回顾一个前面介绍的基础知识,什么是IO多路复用模型?
IO多路复用指的是一个进程/线程可以同时监视多个文件描述符(含socket连接),一旦其中的一个或者多个文件描述符可读或者可写,该监听进程/线程能够进行IO事件的查询。
在Java应用层面,如何实现对多个文件描述符的监视呢?
需要用到一个非常重要的Java NIO组件——Selector 选择器。Selector选择器可以理解为一个IO事件的监听与查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。
2.byteBuffer 1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.110.Final</version > </dependency >
1.bytebuffer的基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) { try (FileChannel channel = new FileInputStream ("data.txt" ).getChannel()){ ByteBuffer buffer = ByteBuffer.allocate(512 ); channel.read(buffer); buffer.flip(); ArrayList<Byte> bytes =new ArrayList <Byte>(); while (buffer.hasRemaining()){ byte b = buffer.get(); bytes.add(b); } System.out.println(bytes); buffer.clear(); }catch (Exception e){ e.printStackTrace(); } }
2.ByteBuffer的结构
capacity 容量
position 读写的位置
limit 读写的限制
compact方法 切换至写模式
1 2 3 4 public static void main (String[] args) { System.out.println(ByteBuffer.allocate(16 ).getClass()); System.out.println(ByteBuffer.allocateDirect(16 ).getClass()); }
class java.nio.HeapByteBuffer 堆内存 读写效率较低,收到GC的影响 class java.nio.DirectByteBuffer 直接内存 读写效率较高(少一层数据的拷贝),不受GC的影响
3.读取方法
buffer.get() 读取
rewind() 从头开始读
mark 做一个标记,记录position的位置,reset是将position重置到mark的位置
get(i) 按索引查找
4.字符串转换
“”.getBytes()
1 2 ByteBuffer buffer = ByteBuffer.allocate(16 );ByteBuffer bytes = buffer.put("hello" .getBytes());
ByteBuffer.warp()
1 ByteBuffer buffer1 = ByteBuffer.wrap("hello" .getBytes());
将buffer转为字符串
1 2 3 4 5 ByteBuffer buffer1 = ByteBuffer.wrap("hello" .getBytes());String decode = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(decode);
5.Scattering Reads
分散读取
1 2 3 4 5 6 ByteBuffer b1 = ByteBuffer.allocate(4 );ByteBuffer b2 = ByteBuffer.allocate(4 );ByteBuffer b3 = ByteBuffer.allocate(6 );b1.flip(); b2.flip(); b3.flip();
6.处理粘包半包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) { ByteBuffer source = ByteBuffer.allocate(32 ); source.put("Hello,world\nI'm Zhangsan\nHo" .getBytes()); split(source); source.put("w are you?\n" .getBytes()); split(source); } public static void split (ByteBuffer source) { source.flip(); for (int i=0 ;i<source.limit();i++){ if (source.get(i) == '\n' ){ int len = i + 1 - source.position(); ByteBuffer target = ByteBuffer.allocate(len); for (int j = 0 ;j<len;j++){ target.put(source.get()); } } } source.compact(); }
3.FileChannel
FileChannel只工作在阻塞模式下
1.transferTo 1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { try ( FileChannel from = new FileInputStream ("data.txt" ).getChannel(); FileChannel to = new FileOutputStream ("data1.txt" ).getChannel(); ){ from.transferTo(0 ,4096 ,to); }catch (Exception e){ e.printStackTrace(); } }
如果传输数据大于2g 使用from.size()获取剩余的大小
2.Path
Path和Paths类 JDK7引入
1.walkFileTree 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws IOException { Files.walkFileTree(Path.of("D:\\BaiduNetdiskDownload" ),new SimpleFileVisitor <Path>(){ @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { System.out.println(file); return super .visitFile(file, attrs); } @Override public FileVisitResult preVisitDirectory (Path dir, BasicFileAttributes attrs) throws IOException { System.out.println(dir); return super .preVisitDirectory(dir, attrs); } }); }
2.删除多级目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws IOException { Files.walkFileTree(Path.of("D:\\BaiduNetdiskDownload" ),new SimpleFileVisitor <Path>(){ @Override public FileVisitResult postVisitDirectory (Path dir, IOException exc) throws IOException { Files.delete(dir); return super .postVisitDirectory(dir, exc); } @Override public FileVisitResult visitFile (Path file, BasicFileAttributes attrs) throws IOException { Files.delete(file); return super .visitFile(file, attrs); } }); }
3.拷贝多级目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 String from = "D:\\BaiduNetdiskDownload" ;String to = "D:\\BaiduNetdiskDownload" ;Files.walk(Paths.get(from)).forEach(path -> { try { String targetName = path.toString().replace(from,to); if (Files.isDirectory(path)){ Files.createDirectory(Paths.get(targetName)); }else { Files.copy(path, Paths.get(targetName)); } } catch (IOException e) { throw new RuntimeException (e); } });
4.网络编程 1.阻塞模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class TestNetServer { public static void main (String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ByteBuffer buffer = ByteBuffer.allocate(10 ); serverSocketChannel.bind(new InetSocketAddress (8080 )); List<SocketChannel> channels = new ArrayList <>(); while (true ) { SocketChannel accept = serverSocketChannel.accept(); channels.add(accept); for (SocketChannel sc : channels){ sc.read(buffer); buffer.flip(); while (buffer.hasRemaining()){ System.out.println(buffer.get()); } buffer.clear(); } } } }
1 2 3 4 5 6 7 8 public class TestNetClient { public static void main (String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress ("localhost" ,8080 )); socketChannel.write(Charset.defaultCharset().encode("test" )); System.out.println("waiting" ); } }
2.非阻塞模式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public static void main (String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ByteBuffer buffer = ByteBuffer.allocate(10 ); serverSocketChannel.bind(new InetSocketAddress (8080 )); serverSocketChannel.configureBlocking(false ); List<SocketChannel> channels = new ArrayList <>(); while (true ) { SocketChannel accept = serverSocketChannel.accept(); accept.configureBlocking(false ); channels.add(accept); for (SocketChannel sc : channels){ sc.read(buffer); buffer.flip(); while (buffer.hasRemaining()){ System.out.println(buffer.get()); } buffer.clear(); } } }
3.selector
事件的类型
accept 会在有链接请求时触发
connect 是客户端,连接建立后触发
read 可读事件
write 可写事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public static void main (String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey Skey = serverSocketChannel.register(selector, 0 , null ); Skey.interestOps(SelectionKey.OP_ACCEPT); serverSocketChannel.bind(new InetSocketAddress (8080 )); while (true ) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if (key.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); SelectionKey SSkey = serverSocketChannel.register(selector, 0 , null ); SSkey.interestOps(SelectionKey.OP_READ); }else if (key.isReadable()){ SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16 ); channel.read(buffer); buffer.flip(); while (buffer.hasRemaining()){ System.out.println(buffer.get()); } } } } }
处理空指针异常问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public static void main (String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); serverSocketChannel.bind(new InetSocketAddress (8080 )); SelectionKey Skey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { selector.select(); Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); SelectionKey SSkey = sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024 ); int bytesRead = channel.read(buffer); if (bytesRead == -1 ) { channel.close(); continue ; } buffer.flip(); while (buffer.hasRemaining()) { System.out.print((char ) buffer.get()); } buffer.clear(); } } } }
4.处理消息边界
LTV格式,Type类型 Length长度 Value数据 在类型和长度已知的情况下方便获取大小
http 2.0 是LTV格式
5.处理写消息过多的情况 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static void main (String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress (8080 )); while (true ){ selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()){ SocketChannel accept = ssc.accept(); accept.configureBlocking(false ); StringBuilder stringBuilder = new StringBuilder (); for (int i=0 ;i<1000000 ;i++){ stringBuilder.append("a" ); } ByteBuffer buffer = Charset.defaultCharset().encode(stringBuilder.toString()); while (buffer.hasRemaining()){ int write = accept.write(buffer); System.out.println(write); } } } } }
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress ("localhost" ,8080 )); int count = 0 ; while (true ){ ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 ); count += socketChannel.read(buffer); System.out.println(count); buffer.clear(); } }
输出
1 2 3 4 5 6 7 8 131071 262142 393213 524284 655355 786426 917497 1000000
处理消息过多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public static void main (String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false ); Selector selector = Selector.open(); ssc.register(selector, SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress (8080 )); while (true ){ selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()){ SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()){ SocketChannel accept = ssc.accept(); accept.configureBlocking(false ); SelectionKey scKey = accept.register(selector, 0 , null ); scKey.interestOps(SelectionKey.OP_READ); ByteBuffer buffer = Charset.defaultCharset().encode("a" .repeat(100000000 )); int write = accept.write(buffer); System.out.println(write); if (buffer.hasRemaining()){ scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE); scKey.attach(buffer); } }else if (key.isReadable()) { ByteBuffer buffer1 = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer1); System.out.println(write); if (!buffer1.hasRemaining()){ key.attach(null ); key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); } } } } }
6.IO模型
阻塞IO
非阻塞IO
多路复用
信号驱动
异步IO
1.AIO
异步IO
在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws IOException { try (AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open( Paths.get("data.txt" ), StandardOpenOption.READ)){ ByteBuffer buffer = ByteBuffer.allocate(16 ); asynchronousFileChannel.read(buffer, 0 , buffer, new CompletionHandler <Integer, ByteBuffer>() { @Override public void completed (Integer result, ByteBuffer attachment) { attachment.flip(); while (attachment.hasRemaining()){ System.out.println((char )attachment.get()); } } @Override public void failed (Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); }catch (Exception e){ e.printStackTrace(); } System.in.read(); }
5.Netty 1.Netty入门
netty是一个异步的、基于事件驱动的网络应用框架,用于开发可维护、高性能的网络服务和客户端
用服务器端向客户端发送Hello
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { new ServerBootstrap ().group(new NioEventLoopGroup ()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }).bind(8080 ); }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws InterruptedException { new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" ,8080 ) .sync() .channel() .writeAndFlush("Hello" ); }
2.组件 1.EventLoop
事件循环对象,本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断地IO事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { EventLoopGroup group = new NioEventLoopGroup (2 ); EventLoopGroup group1 = new DefaultEventLoop (); group.next().submit(()->{ try { Thread.sleep(1000 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("ok" ); }); System.out.println("main" ); }
1 2 3 4 5 group.next().scheduleAtFixedRate(()->{ System.out.println("ok" ); },0 ,1 , TimeUnit.SECONDS); System.out.println("main" );
eventloop分工细化
1 2 3 new ServerBootstrap () .group(new NioEventLoopGroup (),new NioEventLoopGroup (2 ))
进一步细分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup (2 );new ServerBootstrap () .group(new NioEventLoopGroup (),new NioEventLoopGroup (2 )) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(eventLoopGroup,"handler1" ,new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(buf.toString(Charset.defaultCharset())); ctx.fireChannelRead(msg); } }); } }).bind(8080 );
2.Channel
channel
close() 关闭channel
closeFuture() 用来处理channel的关闭
sync 方法的作用是同步等待channel的关闭
addListener方法是异步等待channel的关闭
pipline() 方法添加处理器
write() 方法写入数据
writeAndFlush() 方法将数据写入并且刷出
channelFuture连接问题
1 2 3 4 5 6 7 8 9 10 11 12 13 ChannelFuture future = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" , 8080 ); future.channel().writeAndFlush("Hello" );;
channelFuture处理结果的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 ChannelFuture future = new Bootstrap () .group(new NioEventLoopGroup ()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" , 8080 ); future.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { future.channel().writeAndFlush("Hello" ); } });
channelFuture关闭问题
问题:channel.close是异步的,因此无法在关闭后执行关闭后的操作,可能channel会在一秒钟之后才释放
1 2 3 4 5 6 7 8 9 10 11 12 Channel channel = future.sync().channel();new Thread (()->{ Scanner sc = new Scanner (System.in); while (true ){ String line = sc.nextLine(); if (line.equals("q" )){ channel.close(); break ; } channel.writeAndFlush(line); } },"input" ).start();
正确处理channelFuture关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 ChannelFuture closeFuture = channel.closeFuture(); System.out.println("等待关闭" ); closeFuture.sync(); System.out.println("处理关闭后的操作" ); closeFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { System.out.println("处理关闭后的操作" ); } });
当我们输入q时发现总java线程没有结束,处理该问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 NioEventLoopGroup group = new NioEventLoopGroup ();ChannelFuture future = new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new StringEncoder ()); } }).connect("localhost" , 8080 ); Channel channel = future.sync().channel();new Thread (()->{ Scanner sc = new Scanner (System.in); while (true ){ String line = sc.nextLine(); if (line.equals("q" )){ channel.close(); break ; } channel.writeAndFlush(line); } },"input" ).start(); ChannelFuture closeFuture = channel.closeFuture();closeFuture.addListener((ChannelFutureListener) ChannelFuture-> { System.out.println("处理关闭后的操作" ); group.shutdownGracefully(); }); closeFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { System.out.println("处理关闭后的操作" ); group.shutdownGracefully(); } });
3.Future & Promise
jdk-future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(2 ); Future<Integer> future = service.submit(new Callable <Integer>() { @Override public Integer call () throws Exception { Thread.sleep(1000 ); return 50 ; } }); System.out.println(future.get()); }
netty-future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup (); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable <Integer>() { @Override public Integer call () throws Exception { Thread.sleep(1000 ); return 50 ; } }); System.out.println(future.get()); }
异步获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup (); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable <Integer>() { @Override public Integer call () throws Exception { Thread.sleep(1000 ); return 50 ; } }); future.addListener(new GenericFutureListener <Future<? super Integer>>() { @Override public void operationComplete (Future<? super Integer> future) throws Exception { System.out.println(future.getNow()); } }); }
netty-promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup (); EventLoop eventLoop = group.next(); DefaultPromise<Integer> promise = new DefaultPromise <>(eventLoop); new Thread (()->{ System.out.println("开始" ); try { Thread.sleep(3000 ); }catch (Exception e){ e.printStackTrace(); } promise.setSuccess(999 ); }).start(); System.out.println(promise.get()); System.out.println("结束" ); }
传递错误的结果
1 2 3 4 5 6 7 8 9 10 new Thread (()->{ System.out.println("开始" ); try { Thread.sleep(3000 ); }catch (Exception e){ e.printStackTrace(); promise.setFailure(e); } promise.setSuccess(999 ); }).start();
4.Handler & Pipline
ChannelHandler用来处理Channel的各种事件
将消息传递给下一个handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1" ); ByteBuf buf = (ByteBuf) msg; String name = buf.toString(Charset.defaultCharset()); super .channelRead(ctx, name); } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2" ); super .channelRead(ctx, msg); } });
模拟出站入站
1 2 3 4 5 6 EmbeddedChannel channel = new EmbeddedChannel ();channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("Hello" .getBytes())); channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hi" .getBytes()));
5.ByteBuf 1.创建 1 2 3 4 5 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();System.out.println(buf); String s = "a" .repeat(300 );buf.writeBytes(s.getBytes()); System.out.println(buf);
输出结果
1 2 PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256) PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)
2.直接内存和堆内存
堆内存
1 ByteBuf buf1 = ByteBufAllocator.DEFAULT.heapBuffer();
直接内存(默认)
1 ByteBuf buf2 = ByteBufAllocator.DEFAULT.directBuffer();
3.池化和非池化
可以重用Buf
1 2 3 System.out.println(buf1.getClass());
4.读取
buf.readByte() 读取一个字节
1 2 3 4 5 6 7 8 9 10 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10 );buf.writeBytes(new byte []{2 ,3 ,4 ,5 ,6 ,7 }); ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10 );buf1.writeBytes(new byte []{2 ,3 ,4 ,5 ,6 ,7 }); ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(20 );buffer.writeBytes(buf).writeBytes(buf1);
优化 CompositeByteBuf 避免了重复的复制
1 2 3 4 5 ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10 );buf.writeBytes(new byte []{2 ,3 ,4 ,5 ,6 ,7 }); CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();buffer.addComponents(true ,buf, buf1);
6.Netty进阶 1.粘包和半包问题处理措施 1.定长解码器 1 2 3 4 5 6 7 8 9 10 11 new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder (10 )); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); } }).connect("localhost" , 8080 );
2.LineBasedFrameDecoder 1 2 3 4 5 6 7 8 9 10 11 new Bootstrap () .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer <NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder (10 )); ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); } }).connect("localhost" , 8080 );
3.LengthFieldBaseFrameDecoder
lengthFieldOffset 长度字段偏移量
lengthFiledLength 长度字段长度
lengthAdjustment 长度字段为基准,还有几个字节是内容
initialBytesToStrip 从头剥离几个字节
2.协议的设计和解析
http协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public static void main (String[] args) { final Logger logger = LoggerFactory.getLogger(HttpServer.class); NioEventLoopGroup worker = new NioEventLoopGroup (); NioEventLoopGroup boss = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.channel(NioServerSocketChannel.class); bootstrap.group(boss,worker); bootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec ()); ch.pipeline().addLast(new ChannelInboundHandlerAdapter (){ @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest){ }else if (msg instanceof HttpContent){ } } }); } }); ChannelFuture future = bootstrap.bind(8080 ).sync(); logger.info("Server started at http://127.0.0.1:{}" , 8080 ); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } }
根据消息的类型区分SimpleChannelInboundHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.channel(NioServerSocketChannel.class); bootstrap.group(boss,worker); bootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler (LogLevel.DEBUG)); ch.pipeline().addLast(new HttpServerCodec ()); ch.pipeline().addLast(new SimpleChannelInboundHandler <HttpRequest>() { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpRequest msg) throws Exception { logger.info(msg.uri()); DefaultFullHttpResponse response = new DefaultFullHttpResponse (msg.protocolVersion(), HttpResponseStatus.OK); response.content().writeBytes("<h1>test<h1/>" .getBytes()); ctx.writeAndFlush(response); } }); } });
输出结果 [nioEventLoopGroup-2-1] INFO com.sun.net.httpserver.HttpServer - /index.html
1 response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,128);
3.自定义协议
魔数 第一时间判定是否是有效的数据包
版本号 可以支持协议的升级
序列化算法 消息正文
指令类型 登录、注册、群聊、私聊
请求序号 为了双工通信、提供异步能力
正文长度
消息正文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Decoder extends ByteToMessageCodec <Message> { @Override protected void encode (ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { out.writeBytes(new byte []{1 ,2 ,3 ,4 }); out.writeByte(1 ); out.writeByte(0 ); out.writeByte(2 ); out.writeByte(5 ); out.writeByte(0xff ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream obs = new ObjectOutputStream (bos); obs.writeObject(msg); byte [] byteArray = bos.toByteArray(); out.writeByte(byteArray.length); out.writeBytes(byteArray); } @Override protected void decode (ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { } }
定义解码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class Decoder extends ByteToMessageCodec <Message> { final Logger logger = LoggerFactory.getLogger(Decoder.class); @Override protected void encode (ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { out.writeBytes(new byte []{1 ,2 ,3 ,4 }); out.writeByte(1 ); out.writeByte(0 ); out.writeByte(2 ); out.writeByte(5 ); out.writeByte(0xff ); ByteArrayOutputStream bos = new ByteArrayOutputStream (); ObjectOutputStream obs = new ObjectOutputStream (bos); obs.writeObject(msg); byte [] byteArray = bos.toByteArray(); out.writeByte(byteArray.length); out.writeBytes(byteArray); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception { int magicMsg = in.readInt(); byte version = in.readByte(); byte serializeType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.writeBytes(bytes,0 ,length); if (serializeType == 0 ){ ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); Message msg = (Message)ois.readObject(); logger.info(msg.toString()); list.add(msg); } } }
7.聊天业务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package richuff.top;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ChatServer { static Logger logger = LoggerFactory.getLogger(ChatServer.class); public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.channel(NioServerSocketChannel.class); bootstrap.group(boss,worker); bootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder (1024 ,0 ,0 ,0 ,0 )); } }); bootstrap.bind(8080 ).sync().channel(); }catch (InterruptedException e){ logger.info(String.valueOf(e)); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }