3、Java的NIO概览(一)——Channels、Buffers、Selectors(Windows环境下)

  Java NIO 由以下几个核心部分组成:
①、Channels,类似于传统IO中的流(Stream);
②、Buffers;
③、Selectors
虽然Java NIO 中除此之外还有很多类和组件,但是核心还是Channel,Buffer 和 Selector ,像其它组件Pipe、FileLock,只不过是与三个核心组件共同使用的工具类。Channel和Buffer的关系,如下图所示:
clipboard

  NIO中有4个主要的Channel的实现:
①、abstract FileChannel(负责文件的数据读取),具体实现是FileChannelImpl.class
②、DatagramChannel(负责UDP协议网络数据的读取)
③、abstract SocketChannel(负责TCP协议网络数据的读取,一般用在TCP协议的客户端),具体实现是SocketChannelImpl.class
④、abstract ServerSocketChannel(负责TCP协议网络数据的读取,一般用在TCP协议的服务端)具体实现是ServerSocketChannelImpl.class
与Channel相关的类的UML关系图,如下所示:
clipboard

  NIO中有7个主要的Buffer的实现:
①、abstract ByteBuffer(负责byte数据类型的发送),具体实现在HeapByteBuffer.class(该类的实例是在Java的堆中创建的,由JVM负责垃圾回收)和DirectByteBuffer.class(该类用的是直接内存,该内存由操作系统OS进行管理,使用时可以通过JNI调用一些函数进行创建内存和释放内存)中;
②、abstract CharBuffer(负责char数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
③、abstract DoubleBuffer(负责double数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
④、abstract FloatBuffer(负责float数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
⑤、abstract IntBuffer(负责int数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
⑥、abstract LongBuffer(负责long数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
⑦、abstract ShortBuffer(负责short数据类型的发送),同①,同样有堆中创建的实例和创建直接内存的2个实现类;
与Buffer相关的类的UML关系图,如下所示:
clipboard

  Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便(比如聊天类应用)。如下所示:
clipboard

要使用Selector,需要在Channel上注册Selector,然后调用它的select()函数。select()函数会一直阻塞到某个注册了这个Selector的Channel上有事件就绪。一旦这个方法返回,线程就可以处理这些事件。Selector可以处理的事件有以下4种:

事件名 对应值
服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

与Selector相关的类的UML图,如下所示:
clipboard

一、Channel的简单使用和介绍

  Java NIO的Channel类似Stream,但又有些不同,有以下3点区别:
①、既可以从Channel中读取数据,又可以写数据到通道。但Stream的读写通常是单向的;
②、Channel可以异步地读写;
③、Channel中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入;
下面是一个使用FileChannel读取数据到Buffer中的示例:

package com.chelong.test;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileChannelTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile aFile = new RandomAccessFile("D:\\nio-data.txt", "rw");
      FileChannel inChannel = aFile.getChannel();
      //分配一个长度为33byte的ByteBuffer,此处的具体实现是HeapByteBuffer
      ByteBuffer buf = ByteBuffer.allocate(33);
      int bytesRead = inChannel.read(buf);
      while (bytesRead != -1) {
         System.out.println("Read " + bytesRead);
         //HeapByteBuffer使用前需要先调用filp()函数
         buf.flip();
         while(buf.hasRemaining()){//如果HeapByteBuffer中还有任意长度的byte没有获取出来,hasRemaining()函数将会返回false
            System.out.print((char) buf.get());
         }
         buf.clear();//重置position、limit、mark的值
         bytesRead = inChannel.read(buf);
      }
      aFile.close();
   }
}

我的windows操作系统的D盘根目录下有nio-data.txt文件,如下所示:
clipboard

程序运行结果,如下所示:
clipboard

二、Buffer的简单使用和介绍(使用的例子参考标题一)

  Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。当你将数据写入缓冲区时,缓冲区会记录你已写入的数据量。一旦您需要读取这些数据,就需要使用flip()函数调用将缓冲区从写入模式切换到读取模式。在读取模式下,缓冲区允许你读取写入到缓冲区中的所有数据。使用Buffer读写数据一般分为以下4个步骤:
①、写入数据到Buffer;
②、调用flip()函数;
③、从Buffer中读取数据;
④、调用clear()函数或者compact()函数;
当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,需要通过flip()函数将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。
  在读取完所有数据后,需要清空缓冲区,以便再次进行写入操作。可以通过两种方式来实现:通过调用 clear() 函数,或者通过调用 compact() 函数。
①、clear() 函数会清空整个缓冲区;
②、而 compact() 函数只会清空您已经读取的数据,未读取的数据会被移动到缓冲区的开头,当有新的数据写入时,将从未读取的数据之后开始写入缓冲区。

2.1、Buffer中的position 、limit、capacity

  Buffer本质上就是一块内存区域,您可以在其中写入数据,之后还可以再次读取这些数据。这块内存区域被封装在一个 NIO 缓冲对象中,该对象提供了一些函数,使得对这块内存区域的操作更加简便。为了理解Buffer的工作原理,需要熟悉它的3个属性:

private int position = 0;
private int limit;
private int capacity;

int position和int limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,int capacity的含义总是一样的。如下图所示:
clipboard

2.1.1、capacity

  作为一个内存块,Buffer具有一定的固定大小,这个大小也被称为其“capacity”。只能向Buffer写入capacity个byte、long、char等类型数据。一旦缓冲区已满,需要先将其清空(读取数据或清除内容),然后再才能向Buffer写入其它数据。

2.1.2、position

  当您将数据写入Buffer时,是按照指定的position位置进行写入的。初始时,该位置为 0。当一个byte、long、或者其它类型数据被写入到Buffer以后, position会向前移动到下一个可插入数据的Buffer单元,以便插入数据。position最大值可为capacity – 1。
  当从Buffer读取数据时,也是从position指定的位置进行读取。当Buffer从写入模式切换到读取模式时,position=0。在从Buffer读取数据的过程中,是从position位置开始读取的,并且position会向前移动到下一个位置(position+=1)以继续读取。

2.1.3、limit

  在写入模式下,limit表示能够写入到该Buffer中的数据量的上限,limit=capacity。在读模式下,limit表示还能读取的数据单位(多少个int、byte等),因此,当Buffer从写模式切换到读模式下时,limit=写入模式下的写入位置,换句话说,你能读到之前写入的所有数据(limit被设置成已写数据的数量,这个值在写模式下就是position)

2.2、分配Buffer的方法

  要获取一个Buffer对象,必须首先对其进行分配。每个Buffer对象的获取都有一个名为allocate()的函数来实现这一操作。下面的例子分配了1个capacity=48个byte的ByteBuffer的例子:

//伪代码
ByteBuffer buf = ByteBuffer.allocate(48);

如果要分配1个capacity=1024个字符的CharBuffer,如下:

//伪代码
CharBuffer buf = CharBuffer.allocate(1024);

相应的,如果要分配int类型的Buffer,可以使用IntBuffer,java中其余4个基本数据类型分别用DoubleBuffer、FloatBuffer、LongBuffer、ShortBuffer来分配,基本类型中的boolean类型没有对应的Buffer。

2.3、向Buffer中写数据

  写数据到Buffer有2种方式:
①、从Channel写到Buffer,比如标题一中的例子:

//伪代码
int bytesRead = inChannel.read(buf);

②、通过Buffer的put()函数写到Buffer里,如下所示:

//伪代码
buf.put(127)

put()函数有很多不同入参的重载,允许你以不同的方式把数据写入到Buffer中。例如, 写到一个指定的位置,或者把一个字节数组byte[]写入到Buffer。

2.3.1、flip()函数

  flip()函数将Buffer从写模式切换到读模式。调用flip()函数会设置position=0,同时设置limit=写模式的position的值。换句话说,position现在用于标记读的位置,limit表示写模式下写进了多少个byte、char等(也就是现在的读模式下可以读多少个byte、char等)。

2.4、从Buffer中读数据

  从Buffer中读取数据有2种方式:
①、从Channe读取数据到Buffer,比如标题一中的例子;

//伪代码
int bytesRead = inChannel.read(buf);

②、使用get()函数从Buffer中读取数据;

//伪代码
byte aByte = buf.get();  

get()函数有很多不同入参的重载,允许你以不同的方式从Buffer中读取数据。例如,从指定position读取,或者从Buffer中读取数据到字节数组byte[]。

2.5、rewind()函数

  Buffer.class::rewind()函数是设置position=0,所以你可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(除了boolean的byte、char等基本数据类型)。

2.6、clear()函数与compact()函数

  一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()函数或compact()函数来完成。
  如果调用的是clear()函数,将设置position=0,capacity=limit 。换句话说,Buffer 被清空了。Buffer中的数据并未清除,如果要往Buffer中再写入数据的话,只是用这些标记(变量)告诉我们可以从哪里开始往Buffer里写数据。如果Buffer中有一些未读的数据,调用clear()函数后,这些数据将“被遗忘”,意味着不再有任何标记(变量)会告诉你哪些数据被读过,哪些还没有。
  如果Buffer中仍有未读的数据,且后续还需要这些数据,但是此时想要先写入一些数据,那么使用compact()函数。compact()函数将所有未读的数据拷贝到Buffer起始处。然后设置position=最后一个未读元素+1。设置limit=capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

2.7、mark()函数和reset()函数

  通过调用Buffer.class::mark()函数,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.class::reset()函数恢复到这个position。例如(伪代码):

buffer.mark();
//call buffer.get() a couple of times, e.g. during parsing.
buffer.reset();  //set position back to mark.  
2.8、equals()函数和compareTo()函数

  可以使用 equals() 和 compareTo() 函数来对两个Buffer进行比较。

2.8.1、equals()函数

  当满足下列2个条件时,表示2个Buffer相等:
①、有相同的类型(byte、char、int等);
②、Buffer中剩余的byte、char等的个数相等且内容相同;
因此,equals()函数只是比较Buffer的一部分,不是每一个在它里面的元素都比较。实际上,它只比较Buffer中的剩余元素。

2.8.2、compareTo()函数

  compareTo()函数比较两个Buffer的剩余元素(byte、char等), 如果满足下列条件,则认为一个Buffer“小于”另一个Buffer:
①、一个Buffer中第1个不相等的元素小于另一个Buffer中对应位置的元素(比如:abc<acc);
②、所有元素都相等,但是一个Buffer的元素个数比另一个Buffer少(比如abc<abcd);

三、NIO的分散读和聚集写

  Java NIO支持分散读和聚集写,分散读和聚集写用于描述从Channel中读取或者写入到Channel的操作。
  分散读和聚集写在对传输数据的不同部分进行单独处理的情况下非常有用。比如,如果一条消息包含一个消息头和一个消息体,分散读和聚集写可以将消息头和消息体分别存放在不同的Buffer中。这样做可以更方便地分别处理消息头和消息体。

3.1、分散读

  从Channel进行的分散读取是一种读取操作,它将数据读入多个Buffer中。因此,该Channel会将其中的数据分散存储到多个Buffer中。如下所示:
clipboard

//伪代码
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.read(bufferArray);

请注意,Buffer首先被插入到一个数组ByteBuffer[]中,然后该数组ByteBuffer[]作为参数传递给 channel.read() 函数。然后,read() 函数会按照Buffer在数组ByteBuffer[]中的出现顺序将通道中的数据写入。一旦一个Buffer满了,Channel就会继续填充下一个Buffer。
  由于分散读操作会在1个Buffer填满后再转移到下一个Buffer,这意味着它并不适用于动态大小的消息部分。换句话说,如果消息包含一个消息头和一个消息体,并且消息头的大小是固定的(例如 128 字节),那么分散读的操作是可行的。

3.2、聚集写

  向Channel聚集写是一种写操作,它将来自多个Buffer的数据写入到一个单一的Channel中。因此,该Channel会将来自多个Buffer的数据汇集其中。如下所示:
clipboard

//伪代码
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

//write data into channel

ByteBuffer[] bufferArray = { header, body };

channel.write(bufferArray);

  这些Buffer被传入channel.write()函数中,该函数会按照Buffer在数组ByteBuffer[]中的出现顺序将每个Buffer中的内容写入Channel。但是,只写入Buffer中从int position到int limit之间的数据。因此,如果一个Buffer的容量为 128 byte,但只包含 58 个byte,那么仅从该Buffer向Channel写入 58 个byte。因此,聚集写对于动态大小的消息部分能够正常工作(这一点与分散读是不同,比如3.1的例子中,分散读只能读取读取固定大小的消息头)。

四、NIO中Channel之间的数据传输

  在 Java NIO 中,如果2个Channel中有一个Channel是FileChannel,那么就可以直接将数据从一个Channel传输到另一个Channel。这种2个Channel之间的数据传递可以用FileChannel.class:: transferTo()函数 和 FileChannel.class:: transferFrom() 。

4.1、transferFrom()函数

  FileChannel.class:: transferFrom()函数可以从源Channel的取数据到FileChannel,如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class ChannelToChannelTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fromChannel = fromFile.getChannel();

      RandomAccessFile toFile = new RandomAccessFile("D:\\toFile.txt", "rw");
      FileChannel toChannel = toFile.getChannel();
      //目标文件toFile.txt中开始写入的位置  
      long position = 0;
      //要传输给toFile.txt文件的byte数量最大值,如果fromFile.txt中包含的byte数量<count,则只会按照fromFile.txt中包含的byte数量来传输
      //此处取fromFile.txt文件中包含的byte数量
      long count = fromChannel.size();

      toChannel.transferFrom(fromChannel, position, count);
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件和toFile.txt文件,如下所示:
clipboard
clipboard

程序运行完后,D盘根目录下toFile.txt文件如下所示:
clipboard

4.2、transferTo()函数

  FileChannel.class:: transferTo()函数可以从FileChannel取数据到到另外一个Channel,如下所示(与4.2中的示例相同,唯一的区别在于调用transferTo()函数的是 FileChannel 对象):

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class FileChannelTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fromChannel = fromFile.getChannel();

      RandomAccessFile toFile = new RandomAccessFile("D:\\toFile.txt", "rw");
      FileChannel toChannel = toFile.getChannel();

      long position = 0;
      long count = fromChannel.size();

      fromChannel.transferTo(position, count, toChannel);
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件和toFile.txt文件,如下所示:
clipboard
clipboard

程序运行完后,D盘根目录下toFile.txt文件如下所示:
clipboard

4.3、SocketChannel使用transferFrom()函数和transferTo()函数时的问题(这是我的理解,待验证)

  SocketChannel.class::transferTo()函数只会给另一个FileChannel 一次性传输SocketChannel.class当前内部Buffer已经准备好的并可供使用的数据,之后的时间该SocketChannel 还可能接收到更多可用的数据,这些数据不会一次性传输。换句话说,SocketChannel可能无法一次性将 SocketChannel 所请求的全部数据传输到 FileChannel 中。
  SocketChannel.class::transferFrom()函数一次性只会从另一个FileChannel 中最多取SocketChannel.class内部的Buffe能容纳的最大长度,换句话说,如果标题4.1代码中的count(fromFile.txt中的byte数量)>SocketChannel.class内部的Buffe能容纳的最大长度,那么只会传输SocketChannel.class内部的Buffe能容纳的最大长度个byte。

五、Selector

  Java NIO中的Selector是一个可以检查一个或多个 Java NIO的Channel实例的组件,比如,Selector能够确定哪些Channel已准备好进行Read(读取)或Write(写入)操作。通过Selector,单个线程可以管理多个Channel,从而实现对多个网络连接(SocketChannel或ServerSocketChannel或者)的管理。使用单线程来管理多个Channel可以避免多线程场景下管理多个Channel所带来的线程开销(因为操作系统中,线程之间的切换成本较高,而且每个线程在操作系统中也会占用一些内存,因此线程越少越好)。如下所示:
clipboard

5.1、创建Selector

  通过abstract Selector.class::open()函数,伪代码如下:

Selector selector = Selector.open();
5.2、在Channel上注册Selector

  除了FileChannel以外,要使用带有Selector的Channel(比如SocketChannel、ServerSocketChannel、DatagramChannelImpl),必须先在该Channel注册1个Selector,通过使用 SelectableChannel.class::register() 函数(具体实现在AbstractSelectableChannel.class::register()中)来完成,伪代码如下所示:

//要将Channel与Selector配合使用,该Channel必须处于非阻塞模式。
//也是因为这个原因,FileChannel不能与Selector一起使用,因为FileChannel无法切换至非阻塞模式
channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

register() 函数的第2个参数。这是一个Channel上可以注册的事件集合,表示的是你希望通过Selector在Channel中监听哪些特定事件。有以下4种可以在Channel上注册的事件:

事件名 对应值
服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16) Server端的ServerSocketChannel接受Clientt端的连接被称为“接受就绪”(一般在Server端使用,因为Server端要被动接受Client端的连接)
客户端连接服务端事件 SelectionKey.OP_CONNECT(8) Client端成功连接到另一台服务器的Channel被称为“连接就绪”(一般在Client端使用,因为Client端要主动连接Server端)
读事件 SelectionKey.OP_READ(1) 有数据可供读取的Channel被称为“读取就绪”(Server端和Client端的Channel都会使用)
写事件 SelectionKey.OP_WRITE(4) 准备向其写入数据的Channel准备好时,被称为“写入就绪”。(Server端和Client端的Channel都会使用)

如果一个Channel对多个事件感兴趣,可以通过Selector在Channel中监听多个特定事件,伪代码如下所示:

//注册了selector的channel同时监听读事件和写事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
//注册了selector的channel同时监听读事件、写事件、服务端接收客户端连接事件
SelectionKey key = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_ACCEPT);

与Channel相关的类的UML关系图,如下所示:
clipboard

六、SelectionKey

  当在Channel上注册Selector的事件时,register()函数会返回一个 SelectionKey 对象。这个 SelectionKey 对象包含了5个内容:
①、该Channel中监听事件的集合;
②、该Channel中已经准备就绪的事件集合;
③、这个Channel本身;
④、在这个Channel上注册的Selector本身;
⑤、附加一个对象到SelectionKey 对象上;

6.1、该Channel中监听事件的集合

  表示使用者曾经通过Selector在Channel中监听了哪些特定事件。可以在Channel上注册的事件集合在标题5.2中有详细说明(SelectionKey.OP_ACCEPT(16)、SelectionKey.OP_CONNECT(8)、SelectionKey.OP_READ(1)、SelectionKey.OP_WRITE(4)),使用者可以通过 SelectionKey 来读取和写入这些事件集合,伪代码如下:

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = SelectionKey.OP_ACCEPT  == (interests & SelectionKey.OP_ACCEPT);
boolean isInterestedInConnect = SelectionKey.OP_CONNECT == (interests & SelectionKey.OP_CONNECT);
boolean isInterestedInRead    = SelectionKey.OP_READ    == (interests & SelectionKey.OP_READ);
boolean isInterestedInWrite   = SelectionKey.OP_WRITE   == (interests & SelectionKey.OP_WRITE);

如上所示,使用者可以将注册的事件集合与给定的 SelectionKey 常量进行“&”运算,以确定某个事件是否在注册的事件集合中。

6.2、该Channel中已经准备就绪的事件集合

  在Channel上注册Selector之后会得到一个SelectionKey对象,之后的操作基本上都在访问这个已经准备就绪的事件集合。访问已经准备就绪的事件集合,伪代码如下:

//此处的selectionKey变量是SelectionKey类型的
int readySet = selectionKey.readyOps();

使用者可以按照标题6.1中的方式来测试该渠道能够处理哪些事件/操作。不过,使用者也可以采用以下这4种方法,它们都会返回一个布尔值:

//此处的selectionKey变量是SelectionKey类型的
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
6.3、这个Channel本身和在这个Channel上注册的Selector本身

  从 SelectionKey 对象中获取Channel和Selector是非常简单的,伪代码如下所示:

//此处的selectionKey变量是SelectionKey类型的
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();    
6.4、附加一个对象到SelectionKey 对象上

  使用者可以将一个对象附加到一个SelectionKey对象上,也可以通过这种方法来识别特定Channel,如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AttachObjectToChannel {
   public static void main(String[] args) throws IOException {
      ServerSocketChannel serverChannelA = ServerSocketChannel.open();
      Selector selectorA = Selector.open();
      serverChannelA.configureBlocking(false);
      serverChannelA.socket().bind(new InetSocketAddress(8080), 1024);

      ServerSocketChannel serverChannelB = ServerSocketChannel.open();
      Selector selectorB = Selector.open();
      serverChannelB.configureBlocking(false);
      serverChannelB.socket().bind(new InetSocketAddress(9090), 1024);

      SelectionKey selectionKeyA = serverChannelA.register(selectorA, SelectionKey.OP_ACCEPT);
      SelectionKey selectionKeyB = serverChannelB.register(selectorB, SelectionKey.OP_ACCEPT);
      ArrayList<String> list = new ArrayList<>();
      HashMap<String, String> map = new HashMap<>();
      selectionKeyA.attach(list);
      selectionKeyB.attach(map);

      if (selectionKeyA.attachment() instanceof List) {
         System.out.println("selectionKeyA绑定了List对象");
      }

      if (selectionKeyB.attachment() instanceof Map) {
         System.out.println("selectionKeyB绑定了Map对象");
      }
   }
}

以上代码,运行结果如下:
clipboard

或者可以将更多相关数据附加到该Channel上。比如,您可以将正在使用的Buffer对象附加到Channel上,或者附加一个包含更多汇总数据的对象。伪代码如下所示:

ServerSocketChannel serverChannel = ServerSocketChannel.open();
Selector selectorA = Selector.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(8080), 1024);
ByteBuffer directBuffer = ByteBuffer.allocateDirect(48);
SelectionKey selectionKey = serverChannel.register(selectorA, SelectionKey.OP_ACCEPT,directBuffer);

七、通过Selector选择Channel

  一旦使用者通过Selector注册了一个或多个Channel,就可以调用Selector.class::select() 函数。这个函数会返回那些在Channel中监听的并“准备好”的特定事件,可以在Channel上注册的事件集合在标题5.2中有详细说明(SelectionKey.OP_ACCEPT(16)、SelectionKey.OP_CONNECT(8)、SelectionKey.OP_READ(1)、SelectionKey.OP_WRITE(4))。换句话说,如果使用者对准备进行读取操作的Channel感兴趣,那么使用者将从 select() 函数中接收到那些准备进行读取操作的Channel。Selector.class::select() 函数有3个不同入参的重载:

//直到至少有一个Channel准备好使用者所注册的事件时,才会继续执行后续操作。
int select()
//功能与 select() 函数相同,不同之处在于该函数会最多在指定的 timeout 毫秒内进行阻塞。
int select(long timeout)
//selectNow() 函数完全不会阻塞。它会立即返回,并返回当前所有可用的Channel信息。
int selectNow()

select() 函数返回的int类型的值表示有多少Channel已准备好。也就是说,自上次调用 select() 函数以来有多少Channel变得可用。如果调用 select() 并且它返回 1,表示有一个Channel已准备好,并且使用者操作了这个Channel,然后再次调用 select()函数,并且又有一个Channel已准备好,那么它将再次返回 1。
  如果调用 select() 函数并且它返回 1,但是使用者对第一个已准备好的Channel未做任何操作,然后再次调用 select()函数,并且又有一个Channel已准备好,那么它还是返回 1。

7.1、Selector.class::selectedKeys()函数(具体实现在SelectorImpl.class中,环境是windows操作系统)

  一旦使用者调用了其中一个 select() 函数,并且其返回值表明有一个或多个Channel已准备好,使用者就可以通过一个Set集合(该Set是通过Util.class::ungrowableSet()函数获得的)来访问这些已就绪的Channel,伪代码如下所示:

Set<SelectionKey> selectedKeys = selector.selectedKeys();    

使用者可以通过对这个Set集合(该Set是通过Util.class::ungrowableSet()函数获得的)进行迭代操作,以访问已准备好的Channel。伪代码如下所示:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

此循环会遍历Set集合(该Set是通过Util.class::ungrowableSet()函数获得的)所有元素。对于每个元素,它都会对该元素进行测试,以确定该元素所引用的通道处于何种准备状态。
  需要注意的是,每次迭代结束时的 keyIterator.remove();调用。使用迭代器遍历必须用迭代器删除,否则会触发fast-fail机制。另外,通过 SelectionKey.class::channel() 函数返回的Channel应被转换为您需要操作的Channel类型,例如 ServerSocketChannel、 SocketChannel 或者DatagramChannelImpl。

7.2、Selector.class::wakeUp()函数(具体实现在WindowsSelectorImpl.class,环境是windows操作系统)

  如果有一个调用了Selector的 select() 函数时处于阻塞状态的线程,即便当前没有Channel准备好,也可以通过让另一个线程调用被select()函数阻塞的线程所拥有的 Selector 的 wakeup() 函数来结束阻塞,继续执行,如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class BlockedAndWakeupBySelector {
   private static ServerSocketChannel serverChannel;
   private static Selector selector;

   public static void main(String[] args) throws IOException, InterruptedException {
      serverChannel = ServerSocketChannel.open();
      selector = Selector.open();
      serverChannel.configureBlocking(false);
      serverChannel.socket().bind(new InetSocketAddress(8080), 1024);
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
      new Thread(() -> {
         try {
            long startTime = System.currentTimeMillis();
            int client = selector.select();//因为thread1线程先调用了wakeup()函数,所以当前线程不会在Selector的select()函数处阻塞
            long endTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "线程被阻塞了:" + (endTime - startTime) + "毫秒,接收到:" + client + "个事件");
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            SelectionKey key = null;
            while (iterator.hasNext()) {
               key = iterator.next();
               if (key.isAcceptable()) {
                  // a connection was accepted by a ServerSocketChannel.

               } else if (key.isConnectable()) {
                  // a connection was established with a remote server.

               } else if (key.isReadable()) {
                  // a channel is ready for reading

               } else if (key.isWritable()) {
                  // a channel is ready for writing
               }
               iterator.remove();
            }
         } catch (IOException e) {
            e.printStackTrace();
         }
      }, "selectThread").start();
      Thread.sleep(3000);
      new Thread(() -> {
         selector.wakeup();
      }, "wakeupThread").start();
   }
}

以上代码,运行结果如下:
clipboard

  如果2个线程都可以使用同一个Selector对象,线程1先调用了Selector的wakeup()函数,那么线程2在调用Selector的select()函数时,不会被阻塞。如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class BlockedAndWakeupBySelector {
   private static ServerSocketChannel serverChannel;
   private static Selector selector;

   public static void main(String[] args) throws IOException, InterruptedException {
      serverChannel = ServerSocketChannel.open();
      selector = Selector.open();
      serverChannel.configureBlocking(false);
      serverChannel.socket().bind(new InetSocketAddress(8080), 1024);
      serverChannel.register(selector, SelectionKey.OP_ACCEPT);
      new Thread(() -> {
         selector.wakeup();
      }, "wakeupThread").start();
      
      Thread.sleep(3000);
      
      new Thread(() -> {
         try {
            long startTime = System.currentTimeMillis();
            int client = selector.select();//因为thread1线程先调用了wakeup()函数,所以当前线程不会在Selector的select()函数处阻塞
            long endTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "线程被阻塞了:" + (endTime - startTime) + "毫秒,接收到:" + client + "个事件");
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            SelectionKey key = null;
            while (iterator.hasNext()) {
               key = iterator.next();
               if (key.isAcceptable()) {
                  // a connection was accepted by a ServerSocketChannel.

               } else if (key.isConnectable()) {
                  // a connection was established with a remote server.

               } else if (key.isReadable()) {
                  // a channel is ready for reading

               } else if (key.isWritable()) {
                  // a channel is ready for writing
               }
               iterator.remove();
            }
         } catch (IOException e) {
            e.printStackTrace();
         }
      }, "selectThread").start();
   }
}

以上代码,运行结果如下:
clipboard

7.3、Selector.class::close()函数(具体实现在AbstractSelector.class,环境是windows操作系统)

  当使用者使用完Selector后,需调用close()函数。此方法会关闭Selector,并使与该Selector关联的所有SelectionKey实例失效。但Channel本身并不会被关闭。

八、FileChannel

  NIO中的FileChannel是一种与文件相连接的Channel,代替了传统的 Java IO API 来读取文件的方式。通过使用FileChannel,使用者可以从文件中读取数据,并向文件中写入数据。同时,使用者无法将FileChannel设置为非阻塞(non-blocking mode)模式,它始终处于阻塞模式(blocking mode)运行状态。

8.1、创建FileChannel的方式

  在能够使用FileChannel之前,必须先创建一个FileChannel。FileChannel需要通过InputStream、OutputStream或RandomAccessFile来获取。以下是通过RandomAccessFile获取FileChannel的方法,伪代码如下所示:

RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
8.2、从FileChannel中读取数据的方式

  使用者可以调用FileChannel的read()函数从FileChannel中读取数据,伪代码如下所示:

//首先会分配一个Buffer(此处分配的是HeapByteBuffer对象)。从FileChannel中读取的数据会被读入该HeapByteBuffer对象中。
ByteBuffer buf = ByteBuffer.allocate(48);
//此函数从 FileChannel 中读取数据并将其写入 Buffer 中,read() 函数的返回值表示已写入 Buffer 中的字节数。如果返回值为 -1,则表示到达文件末尾。
int bytesRead = inChannel.read(buf);
8.3、向FileChannel中写数据的方式

  使用者可以调用FileChannel的write()函数将Buffer中的数据写入到FileChannel中,伪代码如下所示:

String newData = "New String to write to file..." + System.currentTimeMillis();

ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());

buf.flip();
//write() 函数向 FileChannel 写入的字节数是无法保证的。因此,需要在while循环中重复调用 write() 函数,直到Buffer中没有更多的字节要写入为止。
while(buf.hasRemaining()) {
    channel.write(buf);
}
8.4、关闭FileChannel的方式

  不再使用的FileChannel需要关闭,伪代码如下所示:

channel.close(); 
8.5、FileChannel中的position

  在对FileChannel进行Read(读取)或Write(写入)操作时,需要指定一个特定的position(位置)。使用者可以通过调用 position()函数来获取FileChannel对象的当前position,使用者也可以通过调用 position(long pos) 函数来设置FileChannel的位置。如下所示:

long pos = channel.position();
channel.position(pos +123);

  如果通过一个FileChannel将position设置到了文件末尾,然后尝试从该FileChannel读取数据,将会得到 -1(文件结束标记)。
  如果在文件末尾设置position,并向FileChannel写入数据,那么文件将会被扩展以适应该position和所写入的数据。这种方式可能导致“file hole”(文件漏洞),即磁盘上的物理文件在所写入的数据中会出现空白区域,如下所示:

  • position的值刚好在文件末尾(fromFile.txt文件中共有31个byte),如下所示:
package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileHoleTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fileChannel = fromFile.getChannel();
      String newData = "New String to write to file..." + System.currentTimeMillis();
      ByteBuffer buf = ByteBuffer.allocate(48);
      buf.clear();
      buf.put(newData.getBytes());
      buf.flip();
      fileChannel.position(31);
      while(buf.hasRemaining()) {
         fileChannel.write(buf);
      }
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件,如下所示:
clipboard

程序运行完后,D盘根目录下fromFile.txt文件如下所示(没有出现文件漏洞):
clipboard

  • position的值超过了文件末尾(原本的fromFile.txt文件中共有31个byte,此处将position设置为33),
package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileHoleTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fileChannel = fromFile.getChannel();
      String newData = "New String to write to file..." + System.currentTimeMillis();
      ByteBuffer buf = ByteBuffer.allocate(48);
      buf.clear();
      buf.put(newData.getBytes());
      buf.flip();
      fileChannel.position(33);
      while(buf.hasRemaining()) {
         fileChannel.write(buf);
      }
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件,如下所示:
clipboard

程序运行完后,D盘根目录下fromFile.txt文件如下所示(出现2个byte的文件漏洞):
clipboard

8.6、FileChannel中的size

  FileChannel 的 size() 函数会返回该Channel所连接文件的大小,如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class FileChannelTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fileChannel = fromFile.getChannel();
      System.out.println("fromFile.txt文件的长度为:"+fileChannel.size());
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件,如下所示:
clipboard

该程序的运行结果,如下所示:
clipboard

8.7、用FileChannel截断1个文件

  可以通过调用FileChannel的truncate()函数(具体实现在FileChannelImpl.class,环境是windows操作系统)来截断文件。当您截断一个文件时,会将其截取到指定的长度处。如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
public class FileChannelTest {
   public static void main(String[] args) throws IOException {
      RandomAccessFile fromFile = new RandomAccessFile("D:\\fromFile.txt", "rw");
      FileChannel fileChannel = fromFile.getChannel();
      fileChannel.truncate(12);
   }
}

我的windows操作系统的D盘根目录下有fromFile.txt文件,如下所示:
clipboard

程序运行完后,D盘根目录下fromFile.txt文件如下所示(只剩12个byte的长度了):
clipboard

8.8、FileChannel的force()函数(具体实现在FileChannelImpl.class,环境是windows操作系统)

  FileChannel 的 force()函数会将FileChannel中尚未写入磁盘的数据全部写入磁盘。由于操作系统出于性能考虑可能会将数据缓存在内存中,所以在调用 force() 函数之前,无法保证FileChannel的数据确实已写入磁盘。
  force()函数接受一个布尔值作为参数,该参数也表示是否一并刷新文件的元数据(如权限等)。

channel.force(true);

九、SocketChannel

  NIO 中的 SocketChannel 是一种与 TCP 网络套接字相连接的Channel,也是NIO中与 Java 网络中的套接字功能相对应的部分。SocketChannel 的创建分为Clinet(客户端)端的SocketChannel和Server(服务端)端的ServerSocketChannel这2个SocketChannel 。

9.1、Clinet(客户端)端的SocketChannel

  使用者可以打开一个SocketChannel,并连接到互联网上的某个服务器(该域名的服务器必须开通对应的端口,否则会报UnresolvedAddressException),伪代码如下所示:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("example.com", 80));

  如果在非阻塞模式下。SocketChannel的connect()函数是异步的(异步的表明connect()函数会在连接建立之前返回)。如果接下来还要确定连接是否已建立,需要调用 finishConnect() 函数,伪代码如下所示:

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("example.com", 80));
while(! socketChannel.finishConnect() ){
    //wait, or do something else...    
}
9.2、SocketChannel的close()函数

在使用完SocketChannel后,可以调用SocketChannel的close()函数来关闭该Channel。伪代码如下所示:

socketChannel.close();    
9.3、SocketChannel的read()函数

  要从SocketChannel读取数据,可以调用SocketChannel的read()函数。如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class SocketChannelTest {
   public static void main(String[] args) throws IOException {
      SocketChannel socketChannel = SocketChannel.open();
      //该域名的服务器必须开通对应的端口,否则会报UnresolvedAddressException
      socketChannel.connect(new InetSocketAddress("example.com", 80));
      //首先会分配一个Buffer(此处分配的是HeapByteBuffer对象)。从SocketChannel中读取的数据会被读入该HeapByteBuffer对象中。
      ByteBuffer buffer = ByteBuffer.allocate(4096);
      //此函数从 SocketChannel中读取数据并将其写入 Buffer 中,read() 函数的返回值表示已经从Channel读取到 Buffer 中的字节个数。如果返回值为 -1,则表示已经读到了流的末尾(连接已关闭)。
      int bytesRead = socketChannel.read(buffer);
      buffer.flip();
      RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\nio-data.txt", "rw");
      FileChannel fileChannel = randomAccessFile.getChannel();
      while (buffer.hasRemaining()) {
         fileChannel.write(buffer);
      }
   }
}

  如果在非阻塞模式下。SocketChannel的read()函数是异步的(异步的表明read()函数会在没有读取任何数据的情况下就返回)。因此,使用者需要留意read()函数返回的整数,它会告知使用者已读取的字节数量。如下所示(多了一个bytesRead>-1的判断,其余都一样):

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class SocketChannelTest {
   public static void main(String[] args) throws IOException {
      SocketChannel socketChannel = SocketChannel.open();
      //该域名的服务器必须开通对应的端口,否则会报UnresolvedAddressException
      socketChannel.connect(new InetSocketAddress("example.com", 80));
      //首先会分配一个Buffer(此处分配的是HeapByteBuffer对象)。从SocketChannel中读取的数据会被读入该HeapByteBuffer对象中。
      ByteBuffer buffer = ByteBuffer.allocate(4096);
      //此函数从 SocketChannel中读取数据并将其写入 Buffer 中,read() 函数的返回值表示已经从Channel读取到 Buffer 中的字节个数。如果返回值为 -1,则表示已经读到了流的末尾(连接已关闭)。
      int bytesRead = socketChannel.read(buffer);
      buffer.flip();
      RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\nio-data.txt", "rw");
      FileChannel fileChannel = randomAccessFile.getChannel();
      while (buffer.hasRemaining() && bytesRead>-1) {
         fileChannel.write(buffer);
      }
   }
}
9.4、SocketChannel的write()函数

  将数据写入SocketChannel,可以调用 SocketChannel 的 write() 函数来实现的,该函数以 Buffer 类型的参数作为入参。如下所示:

package com.chelong.channel;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class SocketChannelTest {
   public static void main(String[] args) throws IOException {
      SocketChannel socketChannel = SocketChannel.open();
      //该域名的服务器必须开通对应的端口,否则会报UnresolvedAddressException
      socketChannel.connect(new InetSocketAddress("example.com", 80));
      String newData = "New String to write to file..." + System.currentTimeMillis();
      ByteBuffer buf = ByteBuffer.allocate(48);
      buf.clear();
      buf.put(newData.getBytes());
      buf.flip();
      while (buffer.hasRemaining()) {
         socketChannel.write(buffer);
      }
   }
}

  如果在非阻塞模式下。SocketChannel的write()函数是异步的(异步的表明write()函数会在没有完成写入操作的情况下就返回)。由于是在 while 循环中执行的写入操作,所以在非阻塞模式下使用write()函数和阻塞模式下使用write()函数相同。

十、ServerSocketChannel

  NIO 中的 ServerSocketChannel 是一种能够监听 TCP 连接请求的Channel,一般用在Server(服务端)端,其功能与标准 Java 网络中Clinet(客户端)端的SocketChanne相同(SocketChanne在标题九中)。

10.1、Server(服务端)端ServerSocketChannel

  使用者可以通过open()函数打开一个ServerSocketChannel ,并绑定一个操作系统上面的端口,IP默认为127.0.0.1(localhost),伪代码如下所示:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));//java.net.InetSocketAddress
10.2、ServerSocketChannel 的close()函数

  与SocketChannel 相同,略;

10.3、监听连接请求

  ServerSocketChannel 的 accept() 函数可以用来监听新连接的建立。当 accept() 函数返回时,它会返回一个带有新连接的 SocketChannel对象。因此,ServerSocketChannel 的 accept()函数会阻塞,直到有新的连接到来。一般在while循环中使用,伪代码如下所示:

while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    //do something with socketChannel...
}

  如果在非阻塞模式下。ServerSocketChannel 的 accept() 函数是异步的(异步的表明accept()函数会在连接没有到来(建立)之前返回null)。因此,在非阻塞模式下,上面的伪代码需要修改为:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);//设置为非阻塞
while(true){
    //设置为非阻塞之后,accept()函数在连接没有到来(建立)之前会返回null
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel != null){//因此要判空后再使用SocketChannel 对象
        //do something with socketChannel...
    }
}
posted @ 2026-02-03 09:35  Carey_ccl  阅读(5)  评论(0)    收藏  举报