Java的IO流

2020/10/16

1、简介

Java IO,Input/Output输入输出,指数据在内部存储器和外部存储器或其他周边设备之间的输入和输出

java.io 包下,基本可以分为四组

  • 基于字节操作的 I/O 接口:InputStream 和 OutputStream
  • 基于字符操作的 I/O 接口:Writer 和 Reader
  • 基于磁盘操作的 I/O 接口:File
  • 基于网络操作的 I/O 接口:Socket

前两组主要从数据格式的不同进行分组,字节与字符

后两组主要从传输方式的不同进行分组,磁盘与网络

I/O 的核心问题,要么是数据格式影响 I/O 操作,要么是传输方式影响 I/O 操作,也就是将什么样的数据写到什么地方的问题(是磁盘或网络)。I/O 只是人与机器或者机器与机器交互的手段,除了在它们能够完成这个交互功能外,我们关注的就是如何提高它的运行效率了,而数据格式传输方式是影响效率最关键的因素。

2、基于字节操作的接口

InputStream字节流输入

InputStream是一个抽象类,它的类继承层次如下图:

根据数据节点类型和处理方式,划分如下

OutputStream字节流输出

OutputStream是一个抽象类,它的类继承层次与inputStream相对,如下图:

同样根据数据节点类型和处理方式,划分如下

无论是输入还是输出,操作数据的方式可以组合使用,各个处理流的类并不是只操作固定的节点流,比如如下输出方式:

//将文件输出流包装到序列化输出流中,再将序列化输出流包装到缓冲中
OutputStream out = new BufferedOutputStream(new ObjectOutputStream(new FileOutputStream(new File("fileName")))

另外,输出流最终写到什么地方必须要指定,要么是写到硬盘中,要么是写到网络中。写网络实际上也是写文件,只不过写到网络中,需要经过底层操作系统将数据发送到其他的计算机中,而不是写入到本地硬盘中。

web文件从服务端下载,一般可以用静态文件映射,或者把文件上传到OSS通过下载链接获取,还有一种方式请求服务端上的文件,就是通过字节流的方式把文件输出到请求中,代码如下

public static void downLoadTemplate(String fileName) {
  HttpServletResponse response = ExcelUtils.getResponse();
  InputStream bis = null;
  BufferedOutputStream out = null;

  try {
    ClassPathResource classPathResource = new ClassPathResource("template/" + fileName);
    bis = classPathResource.getInputStream();
    fileName = URLEncoder.encode(fileName, "UTF-8");
    //设置文件下载头
    response.addHeader("Content-Disposition", "attachment;filename=" + fileName);
    response.setContentType("multipart/form-data");
    out = new BufferedOutputStream(response.getOutputStream());
    int len = 0;
    while ((len = bis.read()) != -1) {
      out.write(len);
      out.flush();
    }
    out.close();
  } catch (BussinessException | IOException e) {
    e.printStackTrace();
    throw new BussinessException(ErrorEnum.FAIL, "未找到对应模板");
  } finally {
    try {
      if (ObjectUtils.isNotEmpty(bis)) {
        bis.close();
      }

      if (ObjectUtils.isNotEmpty(out)) {
        out.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

缺点:与静态文件映射一样,只适合单体应用

3、基于字符操作的接口

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,所以 I/O 操作的都是字节而不是字符,但是为什么要有操作字符的 I/O 接口呢?

这是因为我们的程序中通常操作的数据都是以字符形式,为了程序操作更方便而提供一个直接写字符的 I/O 接口,仅此而已。

基于字符的输入和输出操作接口分别是:Reader 和 Writer ,下图是字符的 I/O 操作接口涉及到的类结构图。

Reader字符输入流

Reader是一个抽象类,它的类继承层次,如下图

根据数据节点类型和处理方式,划分如下:

Writer字符输出流

Writer是一个抽象类,它的类继承层次,如下图

根据数据节点类型和处理方式,划分如下:

不管是 Reader 还是 Writer 类,它们都只定义了读取或写入数据字符的方式,也就是说要么是读要么是写,但是并没有规定数据要写到哪去,写到哪去就是我们后面要讨论的基于磁盘或网络的工作机制。

4、字节与字符的转化

不管是磁盘还是网络传输,最小的存储单元都是字节,而不是字符,设计字符流的原因是为了程序操作更方便。相互转化的类:

  • InputStreamReader
  • OutputStreamWriter

输入流转化过程

从图中看到,InputStreamReader 类是字节到字符的转化桥梁, 其中StreamDecoder指的是一个解码操作类,Charset指的是字符集。

InputStream 到 Reader 的过程需要指定编码字符集,否则将采用操作系统默认字符集,很可能会出现乱码问题,StreamDecoder 则是完成字节到字符的解码的实现类。

点击源码部分,可以看到InputStream到Reader的转化过程

输出流转化过程

从图中看到,OutputStreamWriter类是字节到字符的转化桥梁,其中StreamEncoder完成编码过程。

点击源码部分,可以看到OutputStream到Writer的转化过程

5、基于磁盘操作的接口

文件是操作系统和磁盘驱动器交互的一个最小单元。

在 Java I/O 体系中,File 类是唯一代表磁盘文件本身的对象,它定义了一些常用的操作

  • 文件是否存在

  • 文件创建

  • 文件删除

  • 重命名文件

  • 判断文件的读写权限、设置和查询文件的最近修改时间

Java 中通常的 File 并不代表一个真实存在的文件对象,当你通过指定一个路径描述符时,它就会返回一个代表这个路径相关联的一个虚拟对象,这个可能是一个真实存在的文件或者是一个包含多个文件的目录。

操作流程如下:

点进源码

可以看到,本质是创建一个FileInputStream对象来读取文件内容,也就是字节流读取文件。

最终会创建一个FileDescriptor对象来描述底层操作系统关联的文件

6、基于网络操作的接口

把数据写入网络,最终也是写到磁盘中,不是本地磁盘,是经过操作系统将数据发送到其他计算机中。

Socket

大多时候我们使用socket都是基于TCP/IP的流套接字,在OSI七层网络模型中,Socket工作在会话层。

TCP协议的三次握手

SYN:Synchronize Sequence Number 同步序列编号,TCP连接时使用的握手信号

ACK:Acknowledge character 即确认字符,表示发来的数据已确认接收无误。

传输数据

客户端创建Socket 实例

public static void main(String[] args) throws IOException {
  Socket socket = new Socket("127.0.0.1",8080);
  BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
  String str="hello,我是客户端";
  bufferedWriter.write(str);
  bufferedWriter.flush(); // 发送
  bufferedWriter.close(); // 关闭连接
}

与之对应的服务端

public static void main(String[] args) throws IOException {
  ServerSocket serverSocket = new ServerSocket(8080);
  // 循环监听客户端请求
  while(true){
    Socket socket = serverSocket.accept(); // 阻塞线程,等待客户端请求
    // 读取客户端输入的内容
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    // 读取一行数据
    String s = bufferedReader.readLine();
    System.out.println("收到客户端的消息:"+s);
  }
}

先启动服务端,再运行客户端,结果

注意,客户端只有与服务端建立三次握手成功之后,才会发送数据,而 TCP/IP 握手过程,底层操作系统已经帮我们实现了。当连接已经建立成功,服务端和客户端都会拥有一个 Socket 实例,每个 Socket 实例都有一个 InputStreamOutputStream,正如我们前面所说的,网络 I/O 都是以字节流传输的,Socket 正是通过这两个对象来交换数据。

当 Socket 对象创建时,操作系统将会为 InputStream 和 OutputStream 分别分配一定大小的缓冲区,数据的写入和读取都是通过这个缓存区完成的。

写入端将数据写到 OutputStream 对应的 SendQ 队列中,当队列填满时,数据将被发送到另一端 InputStream 的 RecvQ 队列中,如果这时 RecvQ 已经满了,那么 OutputStream 的 write 方法将会阻塞直到 RecvQ 队列有足够的空间容纳 SendQ 发送的数据。

IO的工作方式

  • 同步阻塞BIO(Blocking IO):即传统的IO模型。

  • 同步非阻塞NIO(Non Blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK

  • 多路复用MIO(Multiplexing IO ):即经典的Reactor设计模式,有时也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。Redis单线程为什么速度还那么快,就是因为用了多路复用IO和缓存操作的原因,多个网络连接请求复用同一个线程IO(selector选择器)处理。

  • 异步AIO(Asynchronous IO):即经典的Proactor设计模式,有时也称异步非阻塞IO

概念扫盲

  • 同步:发起一个请求后,接受者未处理完请求之前,不返回结果;
  • 异步:发起一个请求后,立刻得到接受者的回应表示已接收到请求。但是接受者并没有处理完,通常是依靠事件回调等机制来通知请求者其处理结果;
  • 阻塞:发起一个请求后,一直等待其请求结果返回,也就是当前线程会被挂起,无法从事其他任务,只有当条件就绪才能继续;
  • 非阻塞:发起一个请求后,不用一直等着结果返回,可以先去干其他事情,当条件就绪的时候,就自动回来。

常常同步是阻塞的,异步是非阻塞的。

BIO同步阻塞

采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接。

客户端多线程请求

public static void main(String[] args)  {
  // 创建5个线程
  for(int i=0;i<5;i++){
    final int j=i+1;
    new Thread(()->{
      try {
        Socket socket = new Socket("127.0.0.1",8080);
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        String str="hello,我是第"+j+"个客户端";
        bufferedWriter.write(str);
        bufferedWriter.flush(); // 发送
        bufferedWriter.close(); // 关闭连接
        socket.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }).start();
  }
}

对应的服务端,多线程处理请求

public static void main(String[] args) {
	   	// 创建5个线程来监听客户
		for (int i = 0; i < 5; i++) {
			final int j = i+1;
			new Thread(()->{
				try {
					ServerSocket serverSocket = new ServerSocket(8080);
					// 循环监听客户端请求
					while(true){
						Socket socket = serverSocket.accept();
						// 读取客户端输入的内容
						BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
						// 读取一行数据
						String s = bufferedReader.readLine();
						System.out.println("收到客户端的消息:"+s);
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}).start();
		}
	}

如果出现 100、1000、甚至 10000 个用户同时访问服务器,这个时候,如果使用这种模型,那么服务端也会创建与之相同的线程数量,线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终导致进程宕机或者僵死,不能对外提供服务

这时我们应该使用ThreadPoolExecutor 线程池机制避免创建大量线程,导致资源耗尽。这时IO模型如下图:

当有新的客户端接入时,将客户端的 Socket 封装成一个 Task 投递到后端的线程池中进行处理

客户端创建30个socket线程,代码如下:

public static void main(String[] args)  {
  // 创建30个线程
  for(int i=0;i<30;i++){
    final int j=i+1;
    new Thread(()->{
      try (Socket socket = new Socket("127.0.0.1",8080);
           PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true);
           BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
          ){
        // 发送信息给服务端
        String str="hello,我是第"+j+"个客户端";
        printWriter.println(str);

        // 读取服务端返回的信息
        String result = bufferedReader.readLine();
        System.out.printf("客户端发送内容:%s -> 收到服务端返回的内容:%s\n",str,result);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }).start();
  }
}

服务端使用线程池处理socket信息,代码如下

public static void main(String[] args) throws IOException {
  ServerSocket serverSocket = new ServerSocket(8080);
  ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS
                                                      , new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

  while(true){
    // 监听客户请求
    Socket socket = serverSocket.accept();
    // 使用线程池处理请求
    threadPool.execute(()->{
      try (
        // 读取客户端输入的内容
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        // 向客户端返回信息,将字符转化成字节流,并输出
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()));
      ){
        // 读取一行数据
        String str = bufferedReader.readLine();
        System.out.println(Thread.currentThread().getName()+",收到客户端的消息:"+str);

        printWriter.println("服务端返回信息:"+str);
      }catch (Exception e){
        e.printStackTrace();
      }
    });
  }
}

先启动服务端,再执行客户端。

服务端输出

客户端输出,收到服务端的返回

服务端使用线程池,设置了最大线程数为10,阻塞队列数为10,它的资源占用是可控的,避免了资源耗尽。

它的底层仍然是同步阻塞的 BIO 模型,当面对十万甚至百万级连接的时候,传统的 BIO 模型真的是无能为力的,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

NIO同步非阻塞

默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK,从java1.4引入了NIO,在java.nio包下。

NIO提供了与传统BIO模型中的SocketServerSocket相对应的SocketChannelServerSocketChannel两种不同的套接字通道实现。

  • BIO同步阻塞,适合低负载、低并发的应用程序,提升开发效率
  • NIO同步非阻塞,适合高负载,高并发的网络应用

核心关联类图:

三个关键类:

  • Channel 通道
  • Selector 选择器
  • Buffer 数据缓冲流

Channel可以比作是某种具体的交通工具,如汽车、高铁、飞机,而Selector可以比作是一个车站的车辆运行调度系统,负责监控每辆车的当前运行状态:已经出站或者是在路上等,Selector会轮询每个Channel的状态。Buffer可以比作是交通工具上的座位,Channel是飞机,Buffer就是飞机上的座位。

NIO 引入了 Channel、Buffer 和 Selector 就是想把 IO 传输过程中涉及到的信息具体化,让程序员有机会去控制它们。

传统的网络IO操作中,当调用write()往 Socket 中的 SendQ 队列写数据时,当一次写的数据超过 SendQ 长度时,操作系统会按照 SendQ 的长度进行分割,这个过程中需要将用户空间数据和内核地址空间进行切换,而这个切换不是程序员可以控制的,由底层操作系统来帮我们处理。

而Buffer 中,我们可以控制 Buffer 的 capacity(容量),并且是否扩容以及如何扩容都可以控制。

客户端代码不用变化

服务端代码

public class NioServer {
    public static void main(String[] args) throws IOException {
        // 1、打开服务器套接字通道(交通工具)
        ServerSocketChannel channel = ServerSocketChannel.open();
        // 配置为非阻塞
        channel.configureBlocking(false);
        // 进行服务的绑定,监听8080端口
        channel.bind(new InetSocketAddress(8080));

        // 2、通过open方法找到Selector选择器(车辆监控系统)
        Selector selector = Selector.open();
        // 将channel注册到selector中,并且让selector监听通道中的接受事件
        channel.register(selector, SelectionKey.OP_ACCEPT);
        // 3、监听事件
        while (true){
            // 查询指定事件已经就绪的通道数量,如果为0就跳出
            int readyChannels = selector.select();
            if(0 == readyChannels) continue;
            // 通过选择器取得所有的key集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                // 判断状态是否有效
                if(!key.isValid()){
                    continue;
                }

                if(key.isAcceptable()){
                    // 通道接收就绪
                    ServerSocketChannel ssChanel = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = ssChanel.accept();// 阻塞
                    clientChannel.configureBlocking(false);
                    // 将通道注册到选择器并监听通道中可读事件
                    clientChannel.register(selector,SelectionKey.OP_READ);
                    System.out.println("接收到新的客户端连接,地址:" + clientChannel.getRemoteAddress());
                }else if(key.isReadable()){
                    // 通道可读就绪
                    // 创建一个容量为1024的字节数据
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    // 获取通道
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    // 从通道中读取数据到缓冲(写到缓冲中)
                    int numRead = clientChannel.read(byteBuffer);
                    byteBuffer.flip(); // 缓冲写模式切换到读模式
                    // 获取缓冲中的数据
                    String result = new String(byteBuffer.array(),0,numRead);
                    System.out.println("服务端收到客户端发送的消息:"+result);
                    // 将通道注册到选择器并监听通道中可写事件
                    clientChannel.register(selector,SelectionKey.OP_WRITE);
                }else if(key.isWritable()){
                    // 通道可写就绪
                    // 创建一个容量为1024的字节数据
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    // 获取通道
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    byteBuffer.put("server send".getBytes());
                    byteBuffer.flip(); // 写模式切换到读模式
                    // 缓冲数据写入通道
                    clientChannel.write(byteBuffer);
                    // 将通道注册到选择器并监听通道中可读事件
                    clientChannel.register(selector,SelectionKey.OP_READ);
                    clientChannel.close(); // 关闭通道
                }
                keyIterator.remove(); // 该事件已经处理,可以丢弃
            }
        }

    }
}

关于buffer.flip()方法,它的作用是写进buffer后再flip(),可以读出buffer中的数据。buffer底层实现是数组,核心概念有capacity=容量数组大小,position就是读取或者写入时的下标,limit就是当前读写的最大下标,如下图:

flip的源码

public final Buffer flip() {
  limit = position;
  position = 0;
  mark = -1;
  return this;
}

先启动服务端,再执行客户端,服务端输出:

客户端输出:

客户端也可以使用SocketChannel的写法,如下:

public class NioClient {
    public static void main(String[] args) throws IOException {
        // 写入缓冲buffer,容量1024个字节
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        // 读取缓冲buffer
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        // 1、打开通道,
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false); // 设置为非阻塞
        // 连接服务器地址和端口
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8080));

        // 2、打开选择器
        Selector selector = Selector.open();
        // 通道注册到selector中,并且让selector监听连接事件
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 查询指定事件已经就绪的通道数量
        while(selector.select()>0){
            // 通过选择器取得所有的key集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            // 迭代key
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                if(key.isConnectable()){
                    // 通道连接就绪
                    if(socketChannel.finishConnect()){
                        // 完成连接
                        // 让selector监听写事件
                        socketChannel.register(selector,SelectionKey.OP_WRITE);
                        System.out.println("连接目标服务器....");
                    }
                }else if(key.isWritable()){
                    // 通道可写就绪
                    writeBuffer.clear(); // 清空
                    writeBuffer.put("Hello,我是客户端".getBytes());
                    writeBuffer.flip(); // 写模式切换为读模式
                    socketChannel.write(writeBuffer);

                    // 让selector监听读事件
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }else if(key.isReadable()){
                    // 通道可读就绪
                    // 获取通道
                    SocketChannel client = (SocketChannel) key.channel();
                    readBuffer.clear(); // 清空
                    // 从通道读取数据写到缓冲中
                    client.read(readBuffer);
                    readBuffer.flip(); // 写模式切换为读模式
                    String result = Charset.defaultCharset().newDecoder().decode(readBuffer).toString();
                    System.out.println("收到服务端:"+client.socket().getRemoteSocketAddress() + ",返回消息:"+result);

                    // 继续让selector监听写入事件
                    client.register(selector,SelectionKey.OP_WRITE);
                    client.close(); // 读完并关闭客户端通道
                }
                keyIterator.remove(); // 该事件已经处理,可以丢弃
            }
        }
    }
}

执行结果

总的来说,NIO操作比BIO的操作要复杂。

Selector 被称为选择器多路复用器 ,这时也称为MIO。它是 Java NIO 核心组件中的一个,用于检查一个或多个 Channel(通道)的状态是否处于连接就绪接受就绪可读就绪可写就绪。如此可以实现单线程管理多个 channels,也就是可以管理多个网络连接,如下图:

总结:

相比BIO的线程池管理IO,selector使用了更少线程处理通道,实现了网络高效传输。

缺点:

  • 编程复杂
  • JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%
  • 自行实现的NIO维护成本高

后面使用Google的Netty框架替代NIO实现网络高效传输。

AIO异步非阻塞

Asynchronous IO,从java7引入NIO的改进版NIO2,它是异步非阻塞的模型

异步IO是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

客户端示例代码:

public class AioClient {
  public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
    // 1、打开一个客户端通道
    AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
    // 与服务器建立连接
    channel.connect(new InetSocketAddress("127.0.0.1",8080));

    // 睡眠1秒
    TimeUnit.SECONDS.sleep(1);
    // 2、向服务器发送数据,并阻塞等待返回结果
    channel.write(ByteBuffer.wrap("hello,我是客户端".getBytes())).get();

    // 3、从服务器读取数据
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    // 将通道中的数据写入buffer
    channel.read(byteBuffer).get(); // 阻塞等待返回结果
    byteBuffer.flip(); // 写模式切换为读模式
    String result = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
    System.out.println("收到服务端返回的消息:"+result);
  }
}

服务端示例代码

public class AioServer {
  public AsynchronousServerSocketChannel serverSocketChannel;

  public void listen() throws IOException {
    // 1、打开一个服务端通道
    serverSocketChannel= AsynchronousServerSocketChannel.open();
    // 监听8080端口
    serverSocketChannel.bind(new InetSocketAddress(8080));

    // 2、服务监听
    serverSocketChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AioServer>() {
      @Override
      public void completed(AsynchronousSocketChannel client, AioServer attachment) {
        try {
          if(client.isOpen()){
            System.out.println("接收到新的客户端连接,地址:"+client.getRemoteAddress());
            final ByteBuffer buffer = ByteBuffer.allocate(1024);
            // 读取客户端发送的消息
            client.read(buffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>() {
              @Override
              public void completed(Integer result, AsynchronousSocketChannel attachment) {
                try {
                  buffer.flip(); // 缓冲区由写模式切换为读模式
                  String content = Charset.defaultCharset().newDecoder().decode(buffer).toString();
                  System.out.println("服务端收到客户端发送的消息:"+content);

                  // 向客户端发送消息
                  ByteBuffer writeBuffer= ByteBuffer.allocate(1024);
                  writeBuffer.put("server send".getBytes());
                  writeBuffer.flip();
                  attachment.write(writeBuffer).get();
                } catch (Exception e) {
                  e.printStackTrace();
                }
              }

              @Override
              public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
                // 读取消息失败
                try {
                  exc.printStackTrace();
                  attachment.close();
                } catch (IOException e) {
                  e.printStackTrace();
                }
              }
            });
          }
        } catch (IOException e) {
          e.printStackTrace();
        } finally {
          // 当有新客户端接入的时候,直接调用accept方法
          attachment.serverSocketChannel.accept(attachment,this);
        }
      }

      @Override
      public void failed(Throwable exc, AioServer attachment) {
        // 客户端接收失败
        exc.printStackTrace();
      }
    });
  }

  public static void main(String[] args) throws IOException, InterruptedException {
    // 启动服务器,并监听客户端
    new AioServer().listen();
    // 因为是异步IO执行,让主线程睡眠但不关闭
    Thread.sleep(Integer.MAX_VALUE);
  }
}

先启动服务端,多次执行客户端

服务端执行结果:

客户端:

从代码可以看出,这种组合方式用起来比较复杂,只有在一些非常复杂的分布式情况下使用,像集群之间的消息同步机制一般用这种 I/O 组合方式。如 Cassandra 的 Gossip 通信机制就是采用异步非阻塞的方式。Netty 之前也尝试使用过 AIO,不过又放弃了。

Post Directory