java NIO入门 weir 2017-11-11 11:54:12.0 java nio 2018 之前这篇文章(java IO NIO(2) AIO漫谈)胡扯了很多计算机底层的知识,总之一句话:没有操作系统底层对于字节流和字符流的封装(缓存和内存映射),就没有高性能一说了,明天就不扯那么远了,今天还要扯一扯java NIO,刚才一走神忘了要说什么,想想还是说一下IO的多路复用技术,他的高明之处是实现了一个线程同时处理多个I/O请求。具体实现技术就是Reactor模式,理解IO多路复用还是要了解操作系统底层实现,又回来了,没必要去说了网上真的大把,但是说实话自己写一套还是另外一回事,今天重点是我们的NIO它的实现也是基于操作系统的,我们就通过一个例子说起: 先看服务器的: //通道 private ServerSocketChannel serverSocketChannel = null; //发送缓冲池和接收缓冲池 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); //注册器对象(选择器) private Selector selector = null; //端口 private int port = 9999; //对注册器上面的事件缓存 private static Map msg = new HashMap<>(); public NIOService(int port) throws Exception { this.port = port; serverSocketChannel = serverSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocketChannel.configureBlocking(false); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO服务器启动:监听端口:"+port); } 就这样你的socket就开通了,建立socket不叫链接了而是通道,发消息接消息不再是字节流和字符流而是变成了缓冲池。 注册器是什么,你可以理解为一个线程,其实他就是一个线程而且多线程环境下也是线程安全的,如果该注册器上面有消息来往就会产生多个SelectionKey,后面通过监听来处理一个个的SelectionKey就行了。 //监听 public void listen() throws Exception { while (true) { //判断是否有消息事件进来 int select = selector.select(); if (select == 0) { continue; } //拿到所有的事件连接 SetselectedKeys = selector.selectedKeys(); Iteratoriterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); //处理每个事件消息 handleKey(selectionKey); //处理完删除 iterator.remove(); } } } 一个selector可以监听多个Channel通道,那么通道是什么?通道就是一个个的客户端连接,再进一步说就是一个线程可以处理多个客户端的请求 private void handleKey(SelectionKey selectionKey) throws Exception { SocketChannel client = null; try { //有效的,准备好的 if (selectionKey.isValid() && selectionKey.isAcceptable()) { //开始接收 client = serverSocketChannel.accept(); //非阻塞 client.configureBlocking(false); //改变事件状态,读状态 client.register(selector, SelectionKey.OP_READ); //有效的,可读的 }else if (selectionKey.isValid() && selectionKey.isReadable()) { //先把接收缓存池清一下 receiveBuffer.clear(); //得到一个通道 client = (SocketChannel) selectionKey.channel(); //开始读 int read = client.read(receiveBuffer); if (read>0) { String string = new String(receiveBuffer.array(),0,read); msg.put(selectionKey, string); System.out.println("获取到客户端信息:"+string); //读完之后,设置为可写状态 client.register(selector, SelectionKey.OP_WRITE); } //有效的,可写的 }else if (selectionKey.isValid() && selectionKey.isWritable()) { if (!msg.containsKey(selectionKey)) { return; } client = (SocketChannel) selectionKey.channel(); //写之前清空 sendBuffer.clear(); sendBuffer.put(new String(msg.get(selectionKey)+"发送OK").getBytes()); //将写模式转变为读模式 sendBuffer.flip(); client.write(sendBuffer); //状态设置为可读 client.register(selector, SelectionKey.OP_READ); } } catch (Exception e) { try { selectionKey.cancel(); client.socket().close(); client.close(); return; } catch (Exception e1) { e1.printStackTrace(); } } } 重点就在这里了,selectionKey上面有几种状态: 读 ,写,准备好,可接受,有效的。为什么会有这些状态出现,IO我们知道反正是一直阻塞,没有准备好不好这个概念,NIO为什么有这些状态,我们的接受和发送缓冲池你如果能理解你就很容易理解为什么有这些状态,就好比一个水池,一头进水一头接水,进满水了是不是要把进水阀关上不能再进了,然后通知接水的准备好可以接水了;水全部接完了道理一样。 看到这里我们回头看一下ByteBuffer,看源码你就会发现他就是维护一个数组的东西,那我们这里把他叫缓冲池是否准确?不知道有没有人听说过netty,它里面有个ByteBuf,好我们现在把缓冲池的概念丢掉,我们给他再起一个名字叫缓冲区,NIO中的ByteBuffer和netty中的ByteBuf我们都叫缓冲区,对于NIO来说是不是有通道来读写缓冲区的数据,看看: int read = client.read(receiveBuffer); client.write(sendBuffer); 再多说一下netty,netty里面不但有ByteBuf的概念还有内存池的概念用来专门维护这些ByteBuf,在我看来这才是netty高性能的核心提现之处。 我想除了CPU,没有比读取内存数据更快的介质了,hadoop为什么没有storm和spark,处理速度及时,一个是读取IO一个是读取内存,根本就不在一个数量级。 为什么memcached,varnish响应速度快得吓人,还不是利用了内存,还有阿里的rocketmq都是基于内存的高性能。 有人说把mysql安装到内存里面那简直快爆了,这个我信,因为我之前也接触过把db2数据库安装到内存,通过hibernate访问,速度相差不是几十几百个数量级,那是成千上万的数量级。 那就有人问了为什么jdk的 NIO 不做成像netty那样的内存池来管理缓冲区呢?说实话我也有这样的疑问 ByteBuffer里面有个 position,我们知道这个所谓的缓冲区就是一个固定长度且可维护的数组,那你就可以把position理解为该数组的下标,当该下标大于零就表明有数据了,如果该下标等于零就有两种情况,要么有数据准备读,要么没数据准备写。你要是想更清楚理解不妨百度谷歌一下我想你会找到答案的。 最后贴出服务店代码: package springbootdubbo.rocketmqTest; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; public class NIOService { //通道 private ServerSocketChannel serverSocketChannel = null; //发送缓冲池和接收缓冲池 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); //注册器对象(选择器) private Selector selector = null; //端口 private int port = 9999; //对注册器上面的事件缓存 private static Map msg = new HashMap<>(); public NIOService(int port) throws Exception { this.port = port; serverSocketChannel = serverSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocketChannel.configureBlocking(false); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("NIO服务器启动:监听端口:"+port); } //监听 public void listen() throws Exception { while (true) { //判断是否有消息事件进来 int select = selector.select(); if (select == 0) { continue; } //拿到所有的事件连接 SetselectedKeys = selector.selectedKeys(); Iteratoriterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); //处理每个事件消息 handleKey(selectionKey); //处理完删除 iterator.remove(); } } } private void handleKey(SelectionKey selectionKey) throws Exception { SocketChannel client = null; try { //有效的,准备好的 if (selectionKey.isValid() && selectionKey.isAcceptable()) { //开始接收 client = serverSocketChannel.accept(); //非阻塞 client.configureBlocking(false); //改变事件状态,读状态 client.register(selector, SelectionKey.OP_READ); //有效的,可读的 }else if (selectionKey.isValid() && selectionKey.isReadable()) { //先把接收缓存池清一下 receiveBuffer.clear(); //得到一个通道 client = (SocketChannel) selectionKey.channel(); //开始读 int read = client.read(receiveBuffer); if (read>0) { String string = new String(receiveBuffer.array(),0,read); msg.put(selectionKey, string); System.out.println("获取到客户端信息:"+string); //读完之后,设置为可写状态 client.register(selector, SelectionKey.OP_WRITE); } //有效的,可写的 }else if (selectionKey.isValid() && selectionKey.isWritable()) { if (!msg.containsKey(selectionKey)) { return; } client = (SocketChannel) selectionKey.channel(); //写之前清空 sendBuffer.clear(); sendBuffer.put(new String(msg.get(selectionKey)+"发送OK").getBytes()); //将写模式转变为读模式 sendBuffer.flip(); client.write(sendBuffer); //状态设置为可读 client.register(selector, SelectionKey.OP_READ); } } catch (Exception e) { try { selectionKey.cancel(); client.socket().close(); client.close(); return; } catch (Exception e1) { e1.printStackTrace(); } } } public static void main(String[] args) { try { new NIOService(9999).listen(); } catch (Exception e) { e.printStackTrace(); } } } 客户端代码: package springbootdubbo.rocketmqTest; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Scanner; import java.util.Set; public class NIOClient { //发送缓冲池和接收缓冲池 private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); //注册器对象(选择器) private Selector selector = null; private InetSocketAddress socketAddress = new InetSocketAddress("localhost", 9999); private SocketChannel client = null; public NIOClient() throws Exception { client = SocketChannel.open(); client.configureBlocking(false); client.connect(socketAddress); selector = Selector.open(); client.register(selector, SelectionKey.OP_CONNECT); } public void clientServer() throws Exception { if (client.isConnectionPending()) { client.finishConnect(); System.out.println("客户端连接OK"); client.register(selector, SelectionKey.OP_WRITE); } Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String string = (String) scanner.next(); if ("".equals(string)) { continue; }else if ("exit".equals(string)) { System.exit(0); } handleKey(string); } } private void handleKey(String msg) throws Exception { boolean isWait = true; Iteratoriterator = null; SetselectedKeys = null; try { while (isWait) { int select = selector.select(); if (select == 0) { continue; } selectedKeys = selector.selectedKeys(); iterator = selectedKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); if (selectionKey.isValid() && selectionKey.isWritable()) { sendBuffer.clear(); sendBuffer.put(msg.getBytes()); sendBuffer.flip(); client.write(sendBuffer); client.register(selector, SelectionKey.OP_READ); }else if (selectionKey.isValid() && selectionKey.isReadable()) { receiveBuffer.clear(); int read = client.read(receiveBuffer); if (read>0) { receiveBuffer.flip(); System.out.println("服务器返回的信息:"+new String(receiveBuffer.array(),0,read)); client.register(selector, SelectionKey.OP_WRITE); isWait = false; } } } iterator.remove(); } } catch (Exception e) { ((SelectionKey)selectedKeys).cancel(); client.socket().close(); client.close(); return; } } public static void main(String[] args) { try { new NIOClient().clientServer(); } catch (Exception e) { e.printStackTrace(); } } } 自己运行试试吧 索引 说明 capacity 缓冲区数组的总长度 position 下一个要操作的数据元素的位置 limit 缓冲区数组中不可操作的下一个元素的位置:limit<=capacity mark 用于记录当前position的前一个位置或者默认是-1 初始: position Limit capacity 写入4个字节 position Limit capacity 如果这个时候要把这4个字节的数据写入通道channel,就需要调用buffer的flip()方法: position Limit capacity 写完之后,另一端就可以读到该数据了。读完之后再写就需要调用clear()方法,回到初始状态。 看到这里感觉java NIO没毛病,但是你了解了netty的buffer就会发现,javaNIO还是不够完美。 这里也画一下netty的buffer: 初始: readerIndex writerIndex 当有N个数据写入时: readerIndex writerIndex 当有M(M<=N)个被读取之后: readerIndex writerIndex 前面一段就是可回收的 大家发现这样看起来比JAVA NIO简单了点,但是内部维护机制可不简单。说白了就是复杂的隐藏掉,暴露简单的。 说一点题外话,其实是现代计算机的基础,先从CPU说起,CPU里面是有一些基本的原子性操作指令的,在计算机世界里面叫指令集。 每一款每个型号的CPU指令集都不一完全一样,大家想最接近CPU指令集的指令是什么?我想应该是汇编语言,C还在汇编之后,当时我们也学了汇编但是基本等于没学, 给我们上课的老师是部队的开发单片机的。C以上的高级语言都会把程序编译成二进制数据为什么?比如GO语言编译之后可以直接执行,因为二进制可以直接让CPU执行,性能当然高了。 那么java我们都知道是跑在JVM虚拟机之上的,还多底层的接口调用的是C语言的接口,Direct Memory和Unsafe对象这两个东西不知道你了解否,就是绕过JVM做事情的,这在java世界是不允许的。 不允许不代表不能,否则叫它高级语言就徒有虚名了。Netty4.X之后的内存池就是直接管理内存,大家可以慢慢深入研究,推荐文章(http://blog.csdn.net/youaremoon/article/details/50054387)分析得相当透彻。 NIO我们用到最多的还是对文件的处理和socket网络的处理,先说socket(TCP),TCP最头疼的不是编码解码而是粘包 拆包 丢包,这几个问题都是因为TCP这个协议顶层设计存在的问题。 如果让你用java NIO来处理这些问题,简单的还可以复杂的业务场景就需要花大力气投入研发了,还好有netty帮我们解决了这些问题。先暂停一下,扯点别的东西,我们仔细想想网络传输现在速度快不快,当然是光速的快,但是不要忘了我们的计算机是怎么处理像光速一样传输的数据的,使用电流的开关(脉冲),把光转换为电,再把电转换为0 1 ,再把0 1 装换为字节,再把字节装换为文字图像等等。我们假设TCP 传输的是字节,你可以想想网络传输是有序的么?网络传输肯定是无序的,但是TCP要保证最终是有序的,否则你想想接收到的数据你如果处理,你接收到的数据只会不断在后面追加,这点不知道大家能否理解。 可以这样说netty很容易就可以搭建一个服务器而且性能很高,不管你是做RPC,聊天服务器,还是消息转发,仅仅是公司内部需要可以做的简单点,要是像dubbo这样的RPC那需要考虑的就不只是一个RPC协议了。 我们再说说java NIO的文件处理,文件处理其实没有网络处理复杂,java到今天对文件的处理也是做到了极致,rocketmq大家知道的话就会知道,它的持久化就是保存到了文件上,性能也是杠杠的,就是用到了java NIO中的MappedByteBuffer,还有内存映射的概念,直接从内存到磁盘文件,中间不经过IO处理。