BIO、NIO编程与直接内存、零拷贝

Socket

Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口,其实就是一个门面模式。
本质上就是操作系统提供的一系列的API

在这里插入图片描述
网络通信编程基本常识
服务端、客户端、通信编程关注的三件事
连接(客户端连接服务器,服务器等待和接受连接)、读网络数据写网络数据

BIO

BIO,意为 Blocking I/O,即阻塞的 I/O。

在 BIO 中类 ServerSocket 负责绑
定 IP 地址,启动监听端口,等待客户连接;客户端 Socket 类的实例发起连接操作,ServerSocket接受连接后产生一个新的服务端 socket 实例负责和客户端 socket 实例通过输入和输出流进行通信。

在这里插入图片描述

NIO

NIO 有三大核心组件:Selector 选择器Channel 管道buffer 缓冲区

面向流于面向缓冲
阻塞与非阻塞IO

Selector 选择器
socket 包装变成channel

Reactor模式
注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 ->事件发生后做出相应的处理

核心:面向缓冲
在这里插入图片描述

在这里插入图片描述
Selector
多路复用
Selector 的英文含义是“选择器”,也可以称为为“轮询代理器”、“事件订阅器”、“channel容器管理机”都行。
Java NIO 的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。
应用程序将向 Selector 对象注册需要它关注的 Channel,以及具体的某一个 Channel 会对哪些 IO 事件感兴趣。Selector 中也会维护一个“已经注册的 Channel”的容器。

Channels
通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。

  • 所有被 Selector(选择器)注册的通道,只能是继承了 SelectableChannel 类的子类。
  • ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用 IO”的端口监听。同时支持 UDP 协议和 TCP 协议。
  • ScoketChannel:TCP Socket 套接字的监听通道,一个 Socket 套接字对应了一个客户端 IP:端口 到 服务器 IP:端口的通信连接。

通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。

buffer 缓冲区
JDK NIO 是面向缓冲的。Buffer 就是这个缓冲,用于和 NIO 通道进行交互。
数据是从通道读入缓冲区,从缓冲区写入到通道中的。以写为例,应用程序都是将数据写入缓冲,再通过通道把缓冲的数据发送出去,读也是一样,数据总是先从通道读到缓冲,应用程序再读缓冲的数据。

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存(其实就是数组)。

这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。
在这里插入图片描述
服务端

public class NioServer {
    private static NioServerHandle nioServerHandle;

    public static void main(String[] args){
        nioServerHandle = new NioServerHandle(DEFAULT_PORT);
        new Thread(nioServerHandle,"Server").start();
    }

}

public class NioServerHandle implements Runnable{

    private volatile boolean started;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    /**
     * 构造方法
     * @param port 指定要监听的端口号
     */
    public NioServerHandle(int port) {
        try {
            /*创建选择器的实例*/
            selector = Selector.open();
            /*创建ServerSocketChannel的实例*/
            serverSocketChannel = ServerSocketChannel.open();

            /*设置通道为非阻塞模式*/
            serverSocketChannel.configureBlocking(false);
            /*绑定端口*/
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            /*注册事件,表示关心客户端连接*/
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

            started = true;
            System.out.println("服务器已启动,端口号:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(started){
            try {
                /*获取当前有哪些事件*/
                selector.select(1000);
                /*获取事件的集合*/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
                    如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
                    的键出现,这会导致我们尝试再次处理它。*/
                    iterator.remove();
                    handleInput(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /*处理事件的发生*/
    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()){
            /*处理新接入的客户端的请求*/
            if(key.isAcceptable()){
                /*获取关心当前事件的Channel*/
                ServerSocketChannel ssc
                        = (ServerSocketChannel) key.channel();
                /*接受连接*/
                SocketChannel sc = ssc.accept();
                System.out.println("==========建立连接=========");
                sc.configureBlocking(false);
                /*关注读事件*/
                sc.register(selector,SelectionKey.OP_READ);
            }
            /*处理对端的发送的数据*/
            if(key.isReadable()){
                SocketChannel sc = (SocketChannel) key.channel();
                /*创建ByteBuffer,开辟一个缓冲区*/
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                /*从通道里读取数据,然后写入buffer*/
                int readBytes = sc.read(buffer);
                if(readBytes>0){
                    /*将缓冲区当前的limit设置为position,position=0,
                    用于后续对缓冲区的读取操作*/
                    buffer.flip();
                    /*根据缓冲区可读字节数创建字节数组*/
                    byte[] bytes = new byte[buffer.remaining()];
                    /*将缓冲区可读字节数组复制到新建的数组中*/
                    buffer.get(bytes);
                    String message = new String(bytes,"UTF-8");
                    System.out.println("服务器收到消息:"+message);
                    /*处理数据*/
                    String result = Const.response(message);
                    /*发送应答消息*/
                    doWrite(sc,result);

                }else if(readBytes<0){
                    /*取消特定的注册关系*/
                    key.cancel();
                    /*关闭通道*/
                    sc.close();
                }
            }
        }
    }

    /*发送应答消息*/
    private void doWrite(SocketChannel sc,String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();
        sc.write(buffer);
    }


    public void stop(){
        started = false;
    }

}

客户端

public class NioClient {
    private static NioClientHandle nioClientHandle;

    public static void start(){
        nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);
        //nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,8888);
        new Thread(nioClientHandle,"client").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        nioClientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        Scanner scanner = new Scanner(System.in);
        while(NioClient.sendMsg(scanner.next()));
    }
}

public class NioClientHandle implements Runnable{
    private String host;
    private int port;
    private volatile boolean started;
    private Selector selector;
    private SocketChannel socketChannel;

    public NioClientHandle(String ip, int port) {
        this.host = ip;
        this.port = port;

        try {
            /*创建选择器的实例*/
            selector = Selector.open();
            /*创建ServerSocketChannel的实例*/
            socketChannel = SocketChannel.open();
            /*设置通道为非阻塞模式*/
            socketChannel.configureBlocking(false);

            started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }


    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }

        //循环遍历selector
        while(started){
            try{
                //无论是否有读写事件发生,selector每隔1s被唤醒一次
                selector.select(1000);
                //获取当前有哪些事件可以使用
                Set<SelectionKey> keys = selector.selectedKeys();
                //转换为迭代器
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
                    如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
                    的键出现,这会导致我们尝试再次处理它。*/
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null)
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
    }

    //具体的事件处理方法
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //获得关心当前事件的channel
            SocketChannel sc = (SocketChannel) key.channel();
            //连接事件
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    socketChannel.register(selector,
                        SelectionKey.OP_READ);}
                else System.exit(1);
            }
            //有数据可读事件
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position,position=0,
                    // 用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }
                //链路已经关闭,释放资源
                else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    private void doWrite(SocketChannel channel,String request)
            throws IOException {
        //将消息编码为字节数组
        byte[] bytes = request.getBytes();
        //根据数组容量创建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        /*关心事件和读写网络并不冲突*/
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException{
        /*非阻塞的连接*/
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            socketChannel.register(selector,SelectionKey.OP_READ);
        }else{
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    //写数据对外暴露的API
    public void sendMsg(String msg) throws Exception{
        doWrite(socketChannel, msg);
    }
}

效果
在这里插入图片描述

操作类型SelectionKey

SelectionKey是一个抽象类,表示selectableChannel在Selector中注册的标识.每个Channel向 Selector 注册时,都将会创建一个 SelectionKey。SelectionKey 将 Channel 与 Selector 建立了关系,并维护了 channel 事件。

可以通过 cancel 方法取消键,取消的键不会立即从 selector 中移除,而是添加到cancelledKeys 中,在下一次 select 操作时移除它.所以在调用某个 key 时,需要使用 isValid 进行校验

操作类型 就绪条件及说明
OP_READ 当操作系统读缓冲区有数据可读时就绪。并非时刻都有数据可读,所以一般需要注册该操作,仅当有就绪时才发起读操作,有的放矢,避免浪费 CPU。
OP_WRITE 当操作系统写缓冲区有空闲空间时就绪。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。
OP_CONNECT 当 SocketChannel.connect()请求连接成功后就绪。该操作只给客户端使用。
OP_ACCEPT 当接收到一个客户端连接请求时就绪。该操作只给服务器使用。

在这里插入图片描述

Redis 5.0及之前
单线程Reactor模式

在这里插入图片描述
Redis 6.0
流程
1.主线程负责接受建立连接请求,获取socket放入全局等待读处理队列
2.主线程处理完读事件之后,通过RR(Round Robin )将这些连接分配给这些IO线程
3.主线程阻塞等待IO线程读取Socket完毕
4.主线程通过单线程的方式执行请求命令,请求数据读取并解析完成,但并不执行回写Socket
5.主线程阻塞等待IO线程将数据回写Socket完毕
6.解除绑定,清空等待队列

Buffer 的分配

要想获得一个 Buffer 对象首先要进行分配。 每一个 Buffer 类都有 allocate 方法(可以在堆上分配,也可以在直接内存上分配)。
分配 48 字节 capacity 的 ByteBuffer 的例子:ByteBuffer buf = ByteBuffer.allocate(48);
分配一个可存储 1024 个字符的
CharBuffer:CharBuffer buf = CharBuffer.allocate(1024);
wrap 方法:把一个 byte 数组或 byte 数组的一部分包装成 ByteBuffer:
ByteBuffer wrap(byte [] array)
ByteBuffer wrap(byte [] array, int offset, int length)

直接内存

直接内存又叫堆外内存,并不是虚拟机运行时数据区的一部分,也不是java虚拟机规范中定义的内存区域。

在JDK1.4中新加入了NIO,引入了一种基于通道(channel)与缓冲区(Buffer)的IO方式,可以使用native函数直接分配堆外内存,然后通过一个存储在java堆中的DirerctByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,避免了子啊java堆和native对中来回复制数据

TCP缓冲区

每个TCP的Socket的内核中都有一个发送缓冲区(SO_SNDBUF)和一个接受缓冲区(SO_RECVBUF)
在这里插入图片描述
在这里插入图片描述

HeapByteBuffer 与 DirectByteBuffer,在原理上,前者可以看出分配的 buffer 是在 heap区域的,其实真正 flush 到远程的时候会先拷贝到直接内存,再做下一步操作;在 NIO 的框架下,很多框架会采用 DirectByteBuffer 来操作,这样分配的内存不再是在 java heap 上,经过性能测试,可以得到非常快速的网络交互,在大量的网络交互下,一般速度会比HeapByteBuffer 要快速好几倍。

直接内存(Direct Memory)并不是虚拟机运行时数据区的一部分,也不是 Java 虚拟机规范中定义的内存区域,但是这部分内存也被频繁地使用,而且也可能导致OutOfMemoryError 异常出现。

NIO 可以使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆里面的DirectByteBuffer 对象作为这块内存的引用进行操作。这样能在一些场景中显著提高性能,因为避免了在 Java 堆和 Native 堆中来回复制数据。

直接内存(堆外内存)与堆内存比较

1.直接内存申请空间耗费更高的性能,当频繁申请到一定量时尤为明显
2.直接内存IO读写的性能要优于普通的堆内存,在多次读写操作的情况下差异明显
3.本机直接内存的分配不会收到java堆大小的限制,收到本机总内存大小限制
4.配置虚拟机参数时,不要忽略直接内存 防止出现OOm的异常

直接内存比堆内存相比,避免了二次拷贝
其次,堆中有GC清理,导致数据会有移动
在这里插入图片描述

相关推荐

  1. 拷贝技术

    2024-06-12 20:56:03       36 阅读
  2. 什么是拷贝

    2024-06-12 20:56:03       12 阅读
  3. 13.拷贝

    2024-06-12 20:56:03       8 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-12 20:56:03       17 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-12 20:56:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-12 20:56:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-12 20:56:03       18 阅读

热门阅读

  1. 全面解析C++对象的向上和向下类型转换”

    2024-06-12 20:56:03       7 阅读
  2. Web前端开发海报:揭示前端设计的魅力与技巧

    2024-06-12 20:56:03       10 阅读
  3. Anconda环境迁移

    2024-06-12 20:56:03       7 阅读
  4. 单调队列 加 二分

    2024-06-12 20:56:03       6 阅读
  5. 后仿真中的反标 SDF 警告信息汇总

    2024-06-12 20:56:03       5 阅读