NIO学习笔记及实现简单聊天室
|Word count:2.5k|Reading time:9min|Post View:
NIO学习笔记及实现简单聊天室
NIO简介
Java NIO(New IO) 是从Java 1.4版本开始引入的 一个新的IO API,可以替代标准的Java IO API。 NIO与原来的IO有同样的作用和目的,但是使用 的方式完全不同, NIO支持面向缓冲区的、基于 通道的IO操作。 NIO将以更加高效的方式进行文 件的读写操作
NIO与IO主要区别
NIO通道与缓冲区
通道(Channel)和缓冲区 (Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取 用于连接 IO 设备的通道以及用于容纳数据的缓冲 区。然后操作缓冲区,对数据进行处理 。Java NIO 中的 Buffer 主要用于与 NIO 通道进行 交互,数据是从通道读入缓冲区,从缓冲区写 入通道中的。
Buffer
Buffer 就像一个数组,可以保存多个相同类型的数据。根 据数据类型不同(boolean 除外) ,有以下 Buffer 常用子类: ByteBuffer CharBuffer ShortBuffer IntBuffer LongBuffer FloatBuffer DoubleBuffer 上述 Buffer 类 他们都采用相似的方法进行管理数据,只是各自 管理的数据类型不同而已。
都是通过如下方法获取一个 Buffer 对象: static XxxBuffer allocate(int capacity) : 创建一个容量为 capacity 的 XxxBuffer 对象
Buffer 中的重要概念
容量 (capacity) : 表示 Buffer 最大数据容量,缓冲区容量不能为负,并且创 建后不能更改。
限制 (limit): 第一个不应该读取或写入的数据的索引,即位于 limit 后的数据 不可读写。缓冲区的限制不能为负,并且不能大于其容量。
位置 (position): 下一个要读取或写入的数据的索引。缓冲区的位置不能为 负,并且不能大于其限制
标记 (mark)与重置 (reset): 标记是一个索引,通过 Buffer 中的 mark() 方法 指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这 个 position.
标记、 位置、 限制、 容量遵守以下不变式: 0 <= mark <= position <= limit <= capacity
常用方法
直接与非直接缓冲区
字节缓冲区要么是直接的,要么是非直接的。如果为直接字节缓冲区,则 Java 虚拟机会尽最大努力直接在 此缓冲区上执行本机 I/O 操作。也就是说,在每次调用基础操作系统的一个本机 I/O 操作之前(或之后), 虚拟机都会尽量避免将缓冲区的内容复制到中间缓冲区中(或从中间缓冲区中复制内容)。
直接字节缓冲区可以通过调用此类的 allocateDirect() 工厂方法来创建。此方法返回的缓冲区进行分配和取消 分配所需成本通常高于非直接缓冲区。直接缓冲区的内容可以驻留在常规的垃圾回收堆之外,因此,它们对 应用程序的内存需求量造成的影响可能并不明显。所以,建议将直接缓冲区主要分配给那些易受基础系统的 本机 I/O 操作影响的大型、持久的缓冲区。一般情况下,最好仅在直接缓冲区能在程序性能方面带来明显好 处时分配它们。
直接字节缓冲区还可以通过 FileChannel 的 map() 方法 将文件区域直接映射到内存中来创建。该方法返回 MappedByteBuffer 。 Java 平台的实现有助于通过 JNI 从本机代码创建直接字节缓冲区。如果以上这些缓冲区 中的某个缓冲区实例指的是不可访问的内存区域,则试图访问该区域不会更改该缓冲区的内容,并且将会在 访问期间或稍后的某个时间导致抛出不确定的异常。
字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其 isDirect() 方法来确定。提供此方法是为了能够在 性能关键型代码中执行显式缓冲区管
通道(Channel)
通道(Channel):由 java.nio.channels 包定义 的。 Channel 表示 IO 源与目标打开的连接。 Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据, Channel 只能与 Buffer 进行交互。
实现类
•FileChannel:用于读取、写入、映射和操作文件的通道。
•DatagramChannel:通过 UDP 读写网络中的数据通道。
•SocketChannel:通过 TCP 读写网络中的数据。
•ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来 的连接都会创建一个 SocketChannel。
获取通道
获取通道的一种方式是对支持通道的对象调用 getChannel() 方法。支持通道的类如下:
FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
获取通道的其他方式是使用 Files 类的静态方法 newByteChannel() 获 取字节通道。或者通过通道的静态方法 open() 打开并返回指定通道
Selector
选择器(Selector) 是 SelectableChannle 对象的多路复用器, Selector 可 以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector 可使一个单独的线程管理多个 Channel。 Selector 是非阻塞 IO 的核心。
SelectionKey
SelectionKey: 表示 SelectableChannel 和 Selector 之间的注册关系。每次向 选择器注册通道时就会选择一个事件(选择键)。 选择键包含两个表示为整 数值的操作集。操作集的每一位都表示该键的通道所支持的一类可选择操 作
NIO非阻塞实现类
聊天室
服务端使用循环不断地获取Selector的select()方法返回值,当该返回值大于0时就处理改Selector上被选择的SelectionKey所对应的Channel,select()方法监听所有Channel上的IO操作
服务器端的Selector监听两种操作:连接和读数据
处理连接操作,系统只需将连接完成后的SocketChannel注册到指定Selector
处理读数据操作:系统先从该Socket中读取数据,再将数据写入Selector上注册的所有Channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset;
public class Server { private Selector selector = null; private static final int PORT = 30001; private Charset charset = Charset.forName("UTF-8"); private void init()throws IOException { selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress isa = new InetSocketAddress("localhost", PORT); server.bind(isa); server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); if (sk.isAcceptable()) { SocketChannel sc = server.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); sk.interestOps(SelectionKey.OP_ACCEPT); } if (sk.isReadable()) { SocketChannel sc = (SocketChannel)sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); StringBuilder content = new StringBuilder(); try { while(sc.read(buff) > 0) { buff.flip(); content.append(charset.decode(buff)); } System.out.println("读取的数据:" + content); sk.interestOps(SelectionKey.OP_READ); } catch (IOException ex) { sk.cancel(); if (sk.channel() != null) { sk.channel().close(); } } if (content.length() > 0) { for (SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); if (targetChannel instanceof SocketChannel) { SocketChannel dest = (SocketChannel)targetChannel; dest.write(charset.encode(content.toString())); } } } } } } } public static void main(String[] args) throws IOException { new Server().init(); } }
|
客户端需要两个线程,一个负责读取键盘输入,并写入SocketChannel。
另一个不断查询Selector对象的select()的返回值 大于0 有IO操作需要处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner;
public class Client { private Selector selector = null; private static final int PORT = 30001; private Charset charset = Charset.forName("UTF-8"); private SocketChannel sc = null; private void init()throws IOException { selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress("localhost", PORT); sc = SocketChannel.open(isa); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); new ClientThread().start();
Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { String line = scan.nextLine(); sc.write(charset.encode(line)); } } private class ClientThread extends Thread { public void run() { try { while (selector.select() > 0) { for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); if (sk.isReadable()) { SocketChannel sc = (SocketChannel)sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); StringBuilder content = new StringBuilder(); while(sc.read(buff) > 0) { sc.read(buff); buff.flip(); content.append(charset.decode(buff)); } System.out.println("聊天信息:" + content); sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Client().init(); } }
|