【Netty】Netty的使用和常用组件详解

目录

一、简述

1.1 什么是Netty

1.2 Netty 的优势

1.3 为什么不用 Netty5?

1.4 为什么 Netty 使用 NIO 而不是 AIO?

1.5 为什么不用 Mina?

二、第一个 Netty 程序

2.1 Bootstrap、EventLoop(Group) 、Channel

2.1.1 Bootstrap

2.1.2 Channel

2.1.3 EventLoop

2.2 事件和 ChannelHandler、ChannelPipeline

2.2.1 事件

2.2.2 ChannelHandler类

2.2.3 ChannelPipeline

2.3 ChannelFuture

2.4 Hello,Netty!第一个 Netty 程序

三、Netty 组件详解

3.1 EventLoop 和 EventLoopGroup

3.1.1 线程的分配

3.1.2 线程管理

3.2 Channel、EventLoop(Group)和 ChannelFuture

3.2.1 Channel 接口

3.2.1.1 Channel 的生命周期状态

3.2.1.2 重要 Channel 的方法

3.3 ChannelPipeline 和 ChannelHandlerContext

3.3.1 ChannelPipeline 接口

3.3.2 ChannelHandler

3.3.2.1 ChannelHandler的生命周期

3.3.2.2 ChannelPipeline中的 ChannelHandler

3.3.3 ChannelPipeline上的方法

3.3.4 ChannelHandlerContext

3.3.4.1 Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播

3.3.4.2 ChannelHandlerContext 的 API

3.4 ChannelHandler

3.4.1 ChannelHandler 接口

3.4.2 ChannelInboundHandler 接口

3.4.3 ChannelOutboundHandler 接口

3.4.4 ChannelHandler 的适配器

3.4.5 Handler 的共享和并发安全性

3.4.6 资源管理和 SimpleChannelInboundHandler

3.5 内置通信传输模式

NIO和Epoll通信传输模式的对比

3.6 Bootstrap(引导类/启动类)

3.7 ChannelInitializer

3.8 ChannelOption

3.8.1 ChannelOption.SO_BACKLOG

3.8.2 ChannelOption.SO_REUSEADDR

3.8.3 ChannelOption.SO_KEEPALIVE

3.8.4 ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF

3.8.5 ChannelOption.SO_LINGER

3.8.6 ChannelOption.TCP_NODELAY

3.9 ByteBuf缓冲区类

3.9.1 使用模式

3.9.1.1 堆缓冲区

3.9.1.2 直接缓冲区

3.9.1.3 复合缓冲区

3.9.2 分配

3.9.2.1 ByteBufAllocator 接口

3.9.2.2 Unpooled 缓冲区

3.9.3 随机访问索引/顺序访问索引/读写操作

3.9.4 可丢弃字节

3.9.5 可读字节

3.9.6 可写字节

3.9.7 索引管理

3.9.8 查找操作

3.9.9 派生缓冲区

3.9.10 引用计数

3.9.11 工具类

3.9.12 资源释放

四、解决粘包和半包

4.1 什么是 TCP 粘包和半包?

4.2 TCP 粘包/半包发生的原因

4.2.1 TCP粘包的出现原因

4.2.2 TCP半包的出现原因

4.3 解决粘包/半包的方案

方案一:在包尾增加分割符

1、回车换行符进行分割

2、自定义分割符进行分割

方案二:消息定长

方案三:消息头中包含表示消息总长度(或者消息体长度) 的字段

4.4 辨析 channelRead 和 channelReadComplete

五、编解码器框架

5.1 什么是编解码器

5.2 解码器

5.2.1 将字节数据解码为消息

5.2.2 将一种消息类型解码为另一种

5.2.3 TooLongFrameException

5.3 编码器

5.3.1 将消息编码为字节数据

5.3.2 将一种消息编码为另一种消息

5.4 编解码器类

5.5 实战:实现 SSL/TLS 和 Web 服务

5.5.1 通过 SSL/TLS 保护 Netty 应用程序

5.5.2 HTTP 系列

5.5.2.1 聚合 HTTP 消息

5.5.2.2 HTTP 压缩

5.5.2.3 使用 HTTPS

5.5.3 实现步骤

六、序列化问题

6.1 Java 序列化的缺点

1、无法跨语言

2、序列化后的码流太大

3、序列化性能太低

6.2 如何选择序列化框架

6.2.1 选择四要点

6.2.1 序列化框架比较

6.3 LengthFieldBasedFrame 详解


一、简述

1.1 什么是Netty

Netty 是基于 Java NIO 的异步事件驱动的网络应用框架,它是目前Java体系中知名、最牛逼的网络通讯框架。使用 Netty 可以快速开发网络应用,实现网络通讯(Netty并不是一个中间件,但是可以用它来开发中间件)。Netty 提供了高层次的抽象来简化 TCP 和 UDP 服务器的编程(简化网络编程),但是你仍然可以使用底层的 API。

🛈

信息

要区分好框架和中间件,框架是用来开发应用的,而中间件是服务于应用的组件,很多中间件可以实现集群部署。

Netty 的内部实现是很复杂的,但是 Netty 提供了简单易用的API从网络处理代码中解耦业务逻辑。Netty 是完全基于 NIO 实现的,所以整个 Netty 都是异步的。

Netty 是最流行的 NIO 框架,它已经得到成百上千的商业、商用项目验证,许多框架和开源组件的底层 rpc 都是使用的 Netty,如 Dubbo、Elasticsearch 等等。

下面是官网给出的一些 Netty 的特性:

设计方面

  • 对各种传输协议提供统一的 API(使用阻塞和非阻塞套接字时候使用的是同一个 API,只是需要设置的参数不一样)。
  • 基于一个灵活、可扩展的事件模型来实现关注点清晰分离。
  • 高度可定制的线程模型——单线程、一个或多个线程池。
  • 真正的无数据报套接字(UDP)的支持(since 3.1)。

易用性

  • 完善的 Javadoc 文档和示例代码。
  • 不需要额外的依赖,JDK 5 (Netty 3.x) 或者 JDK 6 (Netty 4.x) 已经足够。

性能

  • 更好的吞吐量,更低的等待延迟。
  • 更少的资源消耗。
  • 最小化不必要的内存拷贝。

安全性

  • 完整的 SSL/TLS 和 StartTLS 支持

Netty已经非常稳定了,近三年都是进行的小版本号的更新,更新的程度已经不大了。所以本次课程以 Netty 4.1.42.Final 版本进行讲解。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.42.Final </version>
    <scope>compile</scope>
</dependency>

1.2 Netty 的优势

  1. API 使用简单,开发门槛低;
  2. 功能强大,预置了多种编解码功能,支持多种主流协议;
  3. 定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展;
  4. 性能高,通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优;
  5. 成熟、稳定,Netty 修复了已经发现的所有 JDK NIO BUG,业务开发人员不需要再为 NIO 的 BUG 而烦恼;
  6. 社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时,更多的新功能会加入;
  7. 经历了大规模的商业应用考验,质量得到验证。

1.3 为什么不用 Netty5

因为Netty5引入了AIO,还将线程模型改成了Fork/Join。但是研发人员发现这样并没有提高Netty的性能,反而使Netty的开发者和使用者(使用AIO是异步编程,异步编程就需要回调,回调太多了就很头痛)的工作难度都变大了,所以就干脆放弃Netty5了。从2022年之后就再也没有更新过了,所以目前Netty5 已经停止开发了。

1.4 为什么 Netty 使用 NIO 而不是 AIO

Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 的底层实现仍使用 EPOLL,Linux的AIO其实是伪AIO,底层仍然是NIO。它没有很好地实现 AIO,因此在性能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化。

AIO 还有个缺点是接收数据需要预先分配缓存,而不是 NIO 那种需要接收时才需要分配缓存,所以对连接数量非常大但流量小的情况, 内存浪费很多。

而且 Linux 上 AIO 不够成熟,处理回调结果速度跟不上处理需求。

作者原话:

Not faster than NIO (epoll) on unix systems (which is true)

There is no daragram suppport

Unnecessary threading model (too much abstraction without usage)

【在 unix 系统上不比 NIO (epoll) 快(这是真的)

没有 daragram 支持

不必要的线程模型(过多的抽象而不使用)】

1.5 为什么不用 Mina

Mina和Netty是同一个作者。简单来说,Mina 几乎不再更新了,Netty 本来就是因为 Mina 不够好所以开发出来的。

二、第一个 Netty 程序

在编写Netty程序之前,我们先来简单了解一下Netty框架中需要用到的组件定义。后续我们还会再针对这里面的关键组件做详细分析。

2.1 BootstrapEventLoop(Group) Channel

2.1.1 Bootstrap

Bootstrap 是 Netty 框架的启动类和主入口类,分为客户端类 Bootstrap 和服务器类 ServerBootstrap 两种。

2.1.2 Channel

Channel 是 Java NIO 的一个基本构造。

它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的 I/O 操作的程序组件)的开放连接,如读操作和写操作。

目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

服务端会为每个新接受的连接创建一个 Channel,并将该 Channel 的处理器设置为指定的 ChannelHandler。客户端也是同理,会给与服务端的连接创建一个Channel,Channel也会有对应的ChannelHandler处理器。

我感觉之前学JDK NIO时Channel的概念,和Netty中这个Channel的概念基本类似,作用应该是一样的。其实就可以把Channel简单理解为是一个Socket,但其实Channel是构建在Socket上层的东西,通过它就不需要直接操作Socket了,而是通过操作Channel来间接操作Socket,Channel又在Socket基础上拓展了更多的功能。

2.1.3 EventLoop

EventLoop 暂时可以看成一个线程,专门用来做时间的循环处理的。对Channel发过来的事件进行循环处理。

EventLoopGroup 自然就可以看成线程组。多个线程就是通过EventLoop Group进行管理。

2.2 事件和 ChannelHandlerChannelPipeline

2.2.1 事件

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。

Netty 事件是按照它们与入站或出站数据流的相关性进行分类的:

  • 可能由入站数据或者相关的状态更改而触发的事件包括: 连接已被激活或者连接失活; 数据读取;用户事件;错误事件。
  • 出站事件是未来将会触发的某个动作的操作结果,这些动作包括:打开或者关闭到远程节点的连接;将数据写到或者冲刷到套接字。

2.2.2 ChannelHandler

每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为入站出站,用来处理事件的 ChannelHandler 也被分为可以处理入站事件的 Handler出站事件的 Handler,当然有些 Handler 既可以处理入站也可以处理出站。

Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议 (如 HTTP 和 SSL/TLS)的 ChannelHandler。

2.2.3 ChannelPipeline

基于 Netty 的网络应用程序中根据业务需求会使用 Netty 已经提供的 ChannelHandler 或者自行开发 ChannelHandler,这些 ChannelHandler 都放在 ChannelPipeline 中统一管理,事件就会在 ChannelPipeline 中流动,并被其中一个或者多个 ChannelHandler 处理。

结构图:

这些ChannelHandler就是责任链模式,把同一个数据进行一层层地处理。

2.3 ChannelFuture

Netty 中所有的 I/O 操作都是异步的,我们知道“异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等”,那就是说至少我们需要一种获得异步执行结果的手段。

JDK 预置了 interface java.util.concurrent.Future,Future 提供了一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现 ChannelFuture,用于在执行异步操作的时候使用。

一般来说,每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture

2.4 HelloNetty!第一个 Netty 程序

服务端:

/**
 * 类说明:基于Netty的服务器
 */
public class EchoServer {
    private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
    // 服务端要监听的端口号
    private final int port;
    /**
     * 构造方法
     */
    public EchoServer(int port) {
        // 初始化监听端口
        this.port = port;
    }
    public static void main(String[] args) throws InterruptedException {
        int port = 9999;
        // 创建服务端对象
        EchoServer echoServer = new EchoServer(port);
        LOG.info("服务器即将启动");
        // 启动服务端
        echoServer.start();
        LOG.info("服务器关闭");
    }
    /**
     * 启动服务端
     * @throws InterruptedException
     */
    public void start() throws InterruptedException {
        // 创建服务端处理器对象
        final EchoServerHandler serverHandler = new EchoServerHandler();
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 服务端启动必备  ServerBootstrap是Netty框架服务端的启动类和主入口类
            ServerBootstrap b = new ServerBootstrap();
            
            // 构建服务端启动类,配置ServerBootstrap的各种参数
            b.group(group) // 将ServerBootstrap与线程组绑定,这些线程将用来做事件处理的工作
             .channel(NioServerSocketChannel.class) // 该方法用于指定要使用的 Channel 实现类。在服务器端的 ServerBootstrap 中,通过.channel()方法可以指定服务器监听的套接字通道的类型。
                                                    // NioServerSocketChannel就实现的Channel接口,通过该类指定服务端使用NIO的通信模式。这个类负责接收客户端连接,并生成一个socket与客户端进行通讯。
             .localAddress(new InetSocketAddress(port)) // 指定监听端口
             //.childOption()
             //.handler();
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(serverHandler);
                }
             }); // 添加Handler用于处理事件,加入到pipeline中。注意服务端的Handler是添加到子Handler中的,使用的是childHandler()方法
            
            // b.bind()用于启动服务器
            // bind()绑定服务器方法是异步的,不会阻塞,但是如果这里不阻塞,因为这个是Netty启动方法,会导致可能还没绑定完成呢,程序就接着往下执行,程序就结束了
            // 所以这里又加了一个sync()方法,将程序阻塞在这里,等待绑定完成
            ChannelFuture f = b.bind().sync(); // 异步绑定到服务器,sync()会阻塞到完成
            /*
             * 在这里,f 是通过 b.bind().sync() 绑定服务器后返回的 ChannelFuture 对象。
             * 通过f来调用 channel() 方法,可以获取到与这个 ChannelFuture 关联的 Channel 对象。
             * 然后调用 closeFuture() 方法获取到这个 Channel 对象的关闭操作的 ChannelFuture 对象,
             * 最后调用 sync() 方法使当前线程等待,直到这个关闭操作完成(即 Channel 关闭)。
             * 这样做是为了保证在 Channel 关闭之前,程序不会继续执行下去,从而保证 Channel 的正常关闭。
             */
            f.channel().closeFuture().sync(); // 阻塞当前线程,直到服务器的ServerChannel被关闭
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}


/**
 * ChannelHandler事件处理器。需要继承ChannelInboundHandlerAdapter类
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 用于处理从客户端接收到的消息。
     * @param ctx 提供了访问 ChannelPipeline 和 ChannelHandler 等组件的方法
     * @param msg 接收到客户端发来的消息对象
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // channel和socket之间就要有一个缓冲区,这里Netty自己实现了ByteBuf
        // 将客户端发过来的数据放入缓冲区ByteBuf
        ByteBuf in = (ByteBuf) msg;
        // 输出缓冲区中的数据
        System.out.println("Server accept: " + in.toString(CharsetUtil.UTF_8));
        // 将接收到的消息写回给客户端
        // writeAndFlush()方法将数据写入到Channel中(准确点说就是刷新到Netty的缓冲区中),并立即刷新Socket上,传递给远程节点。这里相当于将客户端发送的消息原封不动地返回给客户端。
        ctx.writeAndFlush(in);
    }
    /**
     * 当网络连接出现异常的时候执行的处理
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        // 释放ChannelHandlerContext资源
        ctx.close();
    }
}

客户端:

/**
 * 类说明:基于Netty的客户端
 */
public class EchoClient {
    private final int port;
    private final String host;

    /*
     * 绑定客户端要监听的服务端IP和端口
     */
    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    /**
     * 启动客户端
     */
    public void start() throws InterruptedException {
        // 线程组,这些线程用于处理发送给客户端的事件
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // Bootstrap是Netty框架的客户端启动类。客户端启动必备
            Bootstrap b = new Bootstrap();
            
            b.group(group) // 绑定线程组
             .channel(NioSocketChannel.class) // 指定使用NIO的通信模式
             .remoteAddress(new InetSocketAddress(host, port)) // 指定服务器的IP地址和端口
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 给Channel添加对应的事件处理器
                    ch.pipeline().addLast(new EchoClientHandler());
                }
            }); // 添加事件处理器,注意客户端添加事件处理器使用的是handler()方法
            // 通过 connect()方法连接到服务器,sync()会将线程阻塞在这里,直到连接成功,
            ChannelFuture f = b.connect().sync(); // 异步连接到服务器,sync()会阻塞到完成
            // 连接成功好就通过 closeFuture().sync() 方法阻塞当前线程,直到客户端的 Channel 被关闭。
            f.channel().closeFuture().sync(); // 阻塞当前线程,直到客户端的Channel被关闭
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 创建客户端对象
        new EchoClient(9999, "127.0.0.1").start();
    }
}


/**
 * 客户端事件处理器。要继承SimpleChannelInboundHandler类
 */
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 读取到网络数据后进行业务处理
     * @param ctx 提供了访问 ChannelPipeline 和 ChannelHandler 等组件的方法
     * @param msg 服务端传过来的数据
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        // 打印收到的数据
        System.out.println("client Accept" + msg.toString(CharsetUtil.UTF_8));
        // 释放资源
        ctx.close();
    }

    /**
     * channel活跃后,做业务处理
     * 连接建立成功后会执行该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // writeAndFlush()方法用于向对端发送消息,相当于将数据先写入到Netty缓冲区,然后再刷新到Socket中完成数据发送。
        // 通过Unpooled.copiedBuffer()将要发送的消息添加到ByteBuf类中
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,Netty", CharsetUtil.UTF_8));
    }
}

 

三、Netty 组件详解

下面我们再针对上一节用到的Netty组件进行详细讲解。

3.1 EventLoop EventLoopGroup

在一个 while 循环中 select 出事件,然后依次处理每种事件。我们可以把它称为事件循环,这就是 EventLoop。interface io.netty.channel.EventLoop 定义了 Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件

io.netty.util.concurrent包构建在 JDK 的 java.util.concurrent 包上。而 io.netty.channel 包中的类,为了与 Channel 的事件进行交互,扩展了这些接口/类。一个 EventLoop 将由一个永远都不会改变的 Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给 EventLoop 实现,以立即执行或者调度执行。就可以简单理解为一个EventLoop就是一个线程,但实际上EventLoop远远不止是一个线程这么简单,它内部还有很多别的东西。

每一个EventLoop中有一个线程成员属性,还有一个队列,这个队列就是这个线程的任务队列,有点类似于线程池,只是EventLoop只有一个线程。

3.1.1 线程的分配

服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。

异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是为每个 Channel 都创建一个 Thread(多路复用)。EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且同一个的 EventLoop 可能会被分配给多个 Channel。

一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个 EventLoop(以及相关联的 Thread)。例如NioServerSocketChannel在整个生命周期中也只会有对应的同一个EventLoop对其进行管理。

需要注意,EventLoop 的分配方式对 ThreadLocal 的使用的影响。因为一个 EventLoop 通常会被用于支撑多个 Channel,所以对于所有相关联的 Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。

3.1.2 线程管理

当通过一个Channel提交事件任务的时候,如果当前调用的线程正好是支撑该EventLoop(该Channel对应的EventLoop)的线程,那么所提交的代码块将会被直接执行。否则,EventLoop 将把该任务放入到内部任务队列中,以便稍后执行。当该EventLoop下次处理它的事件时,它就会执行任务队列中的那些任务/事件。

必须判断当前线程是EventLoop绑定的线程才能执行该Channel的任务操作,这是为了避免并发冲突。因为EventLoop中可能会有一些共享变量,不能让多个不同线程同时操作。Netty不采用加锁来保证并发安全,这也就保证了Netty的高性能特性。Netty内部大量使用了CAS和volatile。

只要是想执行Channel中的相关操作,必须由Channel对应的EventLoop绑定的线程去执行,可以避免并发安全问题。

3.2 ChannelEventLoop(Group) ChannelFuture

Netty 网络抽象的代表:

  • Channel——Socket;
  • EventLoop——控制流、多线程处理、并发;
  • ChannelFuture——异步通知。

Channel 和 EventLoop 关系如图:

从图上我们可以看出 Channel 需要被注册到某个 EventLoop 上,在 Channel 整个生命周期内都由这个EventLoop处理IO事件,也就是说一个Channel和一个EventLoop进行了绑定, 但是一个EventLoop可以同时被多个Channel绑定。这一点在“EventLoop和EventLoopGroup” 小节里也提及过。

3.2.1 Channel 接口

基本的 I/O 操作(bind() 将Socket绑定到指定端口、connect() 发起连接请求、read() 读操作和 write() 写操作)依赖于底层网络传输所提供的原语。在基于 Java 的网络编程中,其基本的构造是Socket类。Netty 的 Channel 接口所提供 的 API,被用于所有的 I/O 操作。大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。其实我们就可以把Channel近似理解为是一个Socket,但其实Channel是构建在Socket上层的东西,通过它就不需要直接操作Socket了,而是通过操作Channel来间接操作Socket,Channel又在Socket基础上拓展了更多的功能

由于 Channel 是独一无二的,所以为了保证它的顺序性,就将 Channel接口声明为 java.lang.Comparable 的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo()方法的实现将会抛出一个Error。

3.2.1.1 Channel 的生命周期状态
  1. ChannelUnregistered :Channel 已经被创建,但还未注册到 EventLoop
  2. ChannelRegistered :Channel 已经被注册到了 EventLoop
  3. ChannelActive :Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
  4. ChannelInactive :Channel 没有连接到远程节点

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。在程序员的开发编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。

3.2.1.2 重要 Channel 的方法
  • eventLoop(): 返回分配给 Channel 的 EventLoop
  • pipeline(): 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的 ChannelPipeline
  • isActive(): 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的。
  • localAddress(): 返回本地的 SokcetAddress
  • remoteAddress(): 返回远程的 SocketAddress
  • write(): 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正写往 socket
  • flush(): 将之前已写的数据冲刷到底层 socket 进行传输
  • writeAndFlush(): 一个简便的方法,等同于调用 write()并接着调用 flush()

3.3 ChannelPipeline ChannelHandlerContext

3.3.1 ChannelPipeline 接口

当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline,每个 Channel 都有自己的 ChannelPipeline。这项关联是永久性的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站(也就是从网络到业务处理)和 出站(也就是从业务处理到网络)各种事件流的API,我们代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。

3.3.2 ChannelHandler

使事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些 ChannelHandler 对象接收事件、执行它们所实现的处理逻辑, 并将数据传递给链中的下一个 ChannelHandler,而且 ChannelHandler 对象也完全可以拦截事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的。 这就是一个典型的责任链模式

3.3.2.1 ChannelHandler的生命周期

在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法。这些方法的入参都会接受一个 ChannelHandlerContext 参数:

  • handlerAdded():当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved():当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught():当处理过程中在 ChannelPipeline 中有错误产生时被调用
3.3.2.2 ChannelPipeline中的 ChannelHandler

入站和出站 ChannelHandler 被安装到同一个 ChannelPipeline 中,ChannelPipeline 以双向链表的形式进行维护管理。比如下图,我们在网络上传递的数据,要求加密,但是加密后密文比较大,需要压缩后再传输,而且按照业务要求,需要检查报文中携带的用户信息是否合法,于是我们实现了 5 个 Handler:解压(入)Handler、压缩(出)handler、解密(入)Handler、加密(出)Handler、授权(入)Handler。

如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,但是只被处理入站事件的 Handler 处理,也就是解压(入)Handler、解密(入)Handler、 授权(入) Handler,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。

数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从链的尾端开始流动,但是只被处理出站事件的 Handler 处理,也就是加密(出) Handler、 压缩(出)handler,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层, 也就是我们的 Socket。

Netty 能区分入站事件的 Handler 和出站事件的 Handler,并确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递。

所以在我们编写 Netty 应用程序时要注意,分属出站和入站不同的 Handler ,在业务没特殊要求的情况下是无所谓顺序的,正如我们下面的图所示,比如‘压缩(出)handler‘可以放在‘解压(入)handler‘和‘解密(入) Handler‘中间,也可以放在‘解密(入) Handler ‘和‘授权(入) Handler‘之间。

而同属一个方向的 Handler 则是有顺序的,因为上一个 Handler 处理的结果往往是下一 个 Handler 的要求的输入。比如入站处理,对于收到的数据,只有先解压才能得到密文,才能解密,只有解密后才能拿到明文中的用户信息进行授权检查,所以解压->解密->授权这个 三个入站 Handler 的顺序就不能乱。

上图中出站Handler自成一串,入站Handler自成一串。

3.3.3 ChannelPipeline上的方法

既然 ChannelPipeline 以双向链表的形式进行维护管理 Handler,自然也提供了对应的方法在 ChannelPipeline 中增加或者删除、替换 Handler。主要的方法如下:

  • addFirst、addBefore、addAfter、addLast:将一个 ChannelHandler 添加到 ChannelPipeline 中
  • remove:将一个 ChannelHandler 从 ChannelPipeline 中移除
  • replace:将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
  • get:通过类型或者名称返回 ChannelHandler
  • context:返回和 ChannelHandler 绑定的 ChannelHandlerContext
  • names:返回 ChannelPipeline 中所有 ChannelHandler 的名称

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。

3.3.4 ChannelHandlerContext

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext,为什么需要这个 ChannelHandlerContext ?前面我们已经说过,ChannelPipeline 以双向链表的形式进行维护管理 Handler,毫无疑问,Handler 在放入ChannelPipeline 的时候必须要有两个指针 pre 和 next 来说明它的前一个元素和后一个元素,但是 Handler 本身来维护这两个指针合适 吗?想想我们在使用 JDK 的 LinkedList 的时候,我们放入 LinkedList 的数据是不会带这两个指针的,LinkedList 内部会用类 Node 对我们的数据进行包装,而类 Node 则带有两个指针 pre 和 next。

所以,ChannelHandlerContext 的主要作用就和 LinkedList 内部的类 Node 类似。要让专业的人干专业的事,不要把所有的功能都冗杂在一起,直接让ChannelHandlerContext来负责维护链表关系即可。ChannelHandler只关注于处理事件。ChannelHandlerContext就相当于ChannelHandler的上下文。每一个ChannelHandlerContext会对应一个ChannelHandler

不过 ChannelHandlerContext 不仅仅只是个包装类,它还提供了很多的方法,比如让事件从当前 ChannelHandler 传递给链中的下一个 ChannelHandler,还可以被用于获取底层的 Channel,还可以用于写出站数据。

3.3.4.1 ChannelChannelPipeline ChannelHandlerContext 上的事件传播

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 Channel-Pipeline 本身上,但是有一点重要的不同。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播(从出站或者入站的第一个Handler开始传播)。而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中当前所关联ChannelHandler的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler。

我们用一个实际例子来说明,比如服务器收到对端发过来的报文,解压后需要进行解密, 结果解密失败,要给对端一个应答。

如果发现解密失败原因是服务器和对端的加密算法不一致,假如应答报文只能以明文的压缩格式发送,就可以在解密 handler 中直接使用 ctx.write 给对端应答,这样应答报文就只经过压缩 Handler 就发往了对端;如果使用channel或者channelpipeline中的wirite()方法, 就还需要流经所有的出站Handler才能发往对端,很多流经的Handler都是没必要的。

其他情况下,假如应答报文要以加密和压缩格式发送,就可以在解密 handler 中使用 channel.write()或者 channelpipeline.write()给对端应答,这样应答报文就会流经整个出站处理过程。

3.3.4.2 ChannelHandlerContext API
  • alloc 返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
  • bind 绑定到给定的 SocketAddress,并返回 ChannelFuture
  • channel 返回绑定到这个实例的 Channel
  • close 关闭 Channel,并返回 ChannelFuture
  • connect 连接给定的 SocketAddress,并返回 ChannelFuture
  • deregister 从之前分配的 EventExecutor 注销,并返回 ChannelFuture
  • disconnect 从远程节点断开,并返回 ChannelFuture
  • executor 返回调度事件的 EventExecutor
  • fireChannelActive 触发对下一个 ChannelInboundHandler 上的 channelActive()方法(已连接)的调用 fireChannelInactive 触发对下一个 ChannelInboundHandler 上的 channelInactive()方法 (已关闭)的调用
  • fireChannelRead 触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接收的消息)的调用
  • fireChannelReadComplete 触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用
  • fireChannelRegistered 触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用
  • fireChannelUnregistered 触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用
  • fireChannelWritabilityChanged 触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged()方法的调用
  • fireExceptionCaught 触发对下一个 ChannelInboundHandler 上的 fireExceptionCaught(Throwable)方法的调用
  • fireUserEventTriggered 触发对下一个 ChannelInboundHandler 上的 fireUserEventTriggered(Object evt)方法的调用
  • handler 返回绑定到这个实例的 ChannelHandler
  • isRemoved 如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
  • name 返回这个实例的唯一名称
  • pipeline 返回这个实例所关联的 ChannelPipeline
  • read 将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的 channelReadComplete(ctx)方法
  • write 通过这个实例写入消息并经过 ChannelPipeline
  • writeAndFlush 通过这个实例写入并冲刷消息并经过 ChannelPipeline

当使用 ChannelHandlerContext 的 API 的时候,有以下两点:

  1. ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的, 所以缓存对它的引用是安全的;
  2. 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。 

3.4 ChannelHandler

在前面讲解ChannelPipeline的时候已经简单介绍过ChannelHandler了,下面我们就再更加深入地详细讲解一下。

3.4.1 ChannelHandler 接口

从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。 事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。

举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。 当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个 ChannelInboundHandler 中。

这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。

Netty 定义了下面两个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler——处理入站数据以及各种状态变化;
  • ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。

3.4.2 ChannelInboundHandler 接口

下面列出了接口 ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些方法和 Channel 的生命周期密切相关。

API

  • channelRegistered:当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
  • channelUnregistered:当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
  • channelActive:当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
  • channelInactive:当 Channel 离开活动状态并且不再连接它的远程节点时被调用
  • channelReadComplete:当 Channel 上的一个读操作完成时被调用
  • channelRead:当从 Channel 读取数据时被调用
  • ChannelWritabilityChanged:当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法 来检测 Channel 的可写性。与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来设置
  • userEventTriggered:当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用。

注意:channelReadComplete 和 channelRead 这两个方法非常让人搞不清两者的区别是什么,我们先放下这个疑问,后面会有解释。

3.4.3 ChannelOutboundHandler 接口

出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。

所有由 ChannelOutboundHandler 本身所定义的方法:

  • bind(ChannelHandlerContext,SocketAddress,ChannelPromise):当请求将 Channel 绑定到本地地址时被调用
  • connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise):当请求将 Channel 连接到远程节点时被调用
  • disconnect(ChannelHandlerContext,ChannelPromise):当请求将 Channel 从远程节点断开时被调用
  • close(ChannelHandlerContext,ChannelPromise):当请求关闭 Channel 时被调用
  • deregister(ChannelHandlerContext,ChannelPromise):当请求将 Channel 从它的 EventLoop 注销时被调用
  • read(ChannelHandlerContext):当请求从 Channel 读取更多的数据时被调用
  • flush(ChannelHandlerContext):当请求通过 Channel 将入队数据冲刷到远程节点时被调用
  • write(ChannelHandlerContext,Object,ChannelPromise):当请求通过 Channel 将数据写到远程节点时被调用

3.4.4 ChannelHandler 的适配器

有一些适配器类可以将编写自定义的 ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter(处理入站)ChannelOutboundHandlerAdapter(处理出站)

我们可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和 ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。

不过 ChannelOutboundHandler 有个非常让人迷惑的 read 方法,ChannelOutboundHandler 不是处理出站事件的吗?怎么会有 read 方法呢?其实这个 read 方法不是表示读数据,而是表示业务发出了读(read)数据的要求,这个要求也会封装为一个事件进行传播,这个事件因为是业务发出到网络的,自然就是个出站事件,而且这个事件触发的就是 ChannelOutboundHandler 中 read 方法。

如果我们的 Handler 既要处理入站又要处理出站怎么办呢?这个时候就可以使用类 ChannelDuplexHandler,当然也可以同时实现 ChannelOutboundHandler, ChannelInboundHandler 这两个接口,自然就要麻烦很多了。

3.4.5 Handler 的共享和并发安全性

ChannelHandlerAdapter 还提供了实用方法 isSharable()。如果其对应的实现被标注为 Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline。

这就牵涉到了我们实现的 Handler 的共享性和线程安全性。回顾我们的 Netty 代码,在往 pipeline 安装 Handler 的时候,我们基本上是 new 出 Handler 的实例

因为每个 socketChannel 都有自己的 pipeline,而且每个 socketChannel 又是和线程绑定的, 所以这些 Handler 的实例之间完全独立的,只要 Handler 的实例之间不是共享了全局变量, Handler 的实例是线程安全的。

但是如果业务需要我们在多个 socketChannel 之间共享一个 Handler 的实例怎么办呢? 比如统计服务器接受到和发出的业务报文总数,我们就需要用一个 Handler 的实例来横跨所有的 socketChannel 来统计所有 socketChannel 业务报文数。

为了实现这一点,我们可以实现一个 MessageCountHandler,并且在 MessageCountHandler 上使用 Netty 的@Sharable 注解,然后在安装 MessageCountHandler 实例到 pipeline 时,共用一个即可。当然,因为 MessageCountHandler 实例是共享的,所以在实现 MessageCountHandler 的统计功能时,请务必注意线程安全,我们在具体实现时就使用了 Java 并发编程里的 Atomic 类来保证这一点。

3.4.6 资源管理和 SimpleChannelInboundHandler

回想一下我们在 NIO 中是如何接收和发送网络数据的?都是首先创建了一个 Buffer,应用程序中的业务部分和 Channel 之间通过 Buffer 进行数据的交换:

Netty 在处理网络数据时,同样也需要 Buffer,在 Read 网络数据时由 Netty 创建 Buffer, Write 网络数据时 Buffer 往往是由业务方创建的。不管是读和写,Buffer 用完后都必须进行释放,否则可能会造成内存泄露。

在 Write 网络数据时,可以确保数据被写往网络了,Netty 会自动进行 Buffer 的释放, 但是如果 Write 网络数据时,我们有 outBoundHandler 处理了 write()操作并丢弃了数据,没有继续往下写,要由我们负责释放这个 Buffer,就必须调用 ReferenceCountUtil.release 方法, 否则就可能会造成内存泄露。

在 Read 网络数据时,如果我们可以确保每个 InboundHandler 都把数据往后传递了,也就是调用了相关的 fireChannelRead 方法,Netty 也会帮我们释放,同样的,如果我们有 InboundHandler 处理了数据,又不继续往后传递,又不调用负责释放的 ReferenceCountUtil.release 方法,就可能会造成内存泄露。

但是由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个类实现的channelRead方法会在数据被 channelRead0()方法消费之后自动释放数据。

同时系统为我们提供的各种预定义 Handler 实现,都实现了数据的正确处理,所以我们自行在编写业务 Handler 时,也需要注意这一点:要么继续传递,要么自行释放

3.5 内置通信传输模式

Netty框架提供了很多通信传输的模式:

  • NIO

io.netty.channel.socket.nio 使用 java.nio.channels 包作为基础——基于选择器的方式。也就是这个模式底层是通过JDK提供的NIO实现的。目前我们使用Netty,主要还是使用NIO的通信模式,因为在Linux环境下,NIO通信模式使用的更广泛。

  • Epoll

io.netty.channel.epoll 由 JNI 驱动的 epoll()和非阻塞 IO。这个传输支持只有在 Linux 上可用的多种特性,如 SO_REUSEPORT,比 NIO 传输更快,而且是完全非阻塞的。所以EPOLL模式,是Netty自行实现的NIO。Netty自己去调用Linux底层的epoll来实现NIO。将 NioEventLoopGroup 替换为 EpollEventLoopGroup , 并且将 NioServerSocketChannel.class 替换为 EpollServerSocketChannel.class 即可。

  • OIO

io.netty.channel.socket.oio 使用 java.net 包作为基础——使用阻塞流。它其实就是BIO。

  • Local

io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输。这个用的很少。

  • Embedded

io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。在测试 ChannelHandler 实现时非常有用。这个模式一般是用在单元测试的时候。

NIOEpoll通信传输模式的对比

JDK NIO因为为了兼顾一次编写多处运行的特性,其实是牺牲了一些性能的。所以使用Netty自己实现的NIO,通过直接调用Linux中的epoll来实现NIO,会有更好的性能。但是使用Netty实现的NIO,无法做到在Windows本地调试,因为只能在Linux环境下运行。Netty对其实现的包如下:

3.6 Bootstrap(引导类/启动类)

Bootstrap就相当于Netty启动类,客户端和服务端必须要有。

网络编程里,“服务器”和“客户端”实际上表示了不同的网络行为;换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。

因此,有两种类型的Bootstrap:一种用于客户端(简单地称为 Bootstrap),而另一种 (ServerBootstrap)用于服务器。无论你的应用程序使用哪种协议或者处理哪种类型的数据, 唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。

比较 Bootstrap 类:

Bootstrap  

ServerBootstrap

网络编程中的作用

连接到远程主机和端口

绑定到一个本地端口

EventLoopGroup 的数目

1

2

ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的。

第二个区别可能更加明显。一个Bootstrap只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个EventLoopGroup(也可以是同一个实例,即一个EventLoopGroup)。

因为服务器需要两组不同的 Channel。第一组将只包含一个 ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。

与 ServerChannel 相关联的 EventLoopGroup 将分配一个负责为传入连接请求创建 Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。也就是第一个EventLoopGroup提供用于创建Channel的EventLoop,第二个EventLoopGroup提供用于为Channel处理事件的EventLoop

3.7 ChannelInitializer

ChannelInitializer是Netty 提供的一个特殊的 ChannelInboundHandlerAdapter 子类:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

它定义了下面的方法:

protected abstract void initChannel(C ch) throws Exception;

这个方法提供了一种将多个 ChannelHandler 添加到一个 ChannelPipeline 中的简便方法。 你只需要简单地向 Bootstrap 或 ServerBootstrap 的实例提供你的 ChannelInitializer 实现即可,并且一旦 Channel 被注册到了它的 EventLoop 之后,就会调用你的 initChannel()版本。 在该方法返回之后,ChannelInitializer 的实例将会从 ChannelPipeline 中移除它自己。ChannelInitializer本身也是一个Handler。

在之前我们写Netty样例代码的时候,就已经使用过ChannelInitializer了,就是通过它将多个Handler批量添加到Bootstrap中的。

ChannelInitializer这个类还用@Sharable修饰了,是一个共享的Handler。

在我们自己的应用程序中,如果存在着某个 handler 只使用一次的情况,也可以仿造 ChannelInitializer,用完以后将自己从 ChannelPipeline 中移除自己,比如授权 handler, 某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 handler 移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline ,依然会进行授权检查。

3.8 ChannelOption

Netty中,ChannelOption用于设置TCP参数。ChannelOption 的各种属性在套接字选项中都有对应。

在构造BootStrap的时候,就可以设置ChannelOption参数。

SO、IP、TCP开头的参数,一般就是和TCP相关的参数。

3.8.1 ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG 对应的是 tcp/ip 协议 listen 函数中的 backlog 参数,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理。所以操作系统里一般有两个队列,一个是 ACCEPT 队列,保存着已经完成了 TCP 的三次握手的连接,一个 SYN 队列, 服务器正在等待 TCP 的三次握手完成的队列。

BSD派生系统里backlog指的就是SYN队列的大小,在Linux的实现里backlog相对来说, 就含糊不清了,有些内核版本指的是 ACCEPT 队列+SYN 队列合起来的大小,有的是指 SYN 队列的大小。

但是从 Linux 2.2 开始,backlog 的参数行为在 Linux 2.2 中发生了变化,现在它指定等待接受的完全建立的套接字的队列长度,而不是不完整的连接请求的数量。 不完整套接字队列的最大长度可以使用 /proc/sys/net/ipv4/tcp_max_syn_backlog 设置,默认值为 128。

如果 backlog 参数大于 /proc/sys/net/core/somaxconn 中的值,那么它会被静默截断为值 128。在 2.4.25 之前的内核中,此限制是硬编码值,后续内核版本也可以通过 vim /etc/sysctl.conf 来修改,包括我们前面所说的 tcp_max_syn_backlog 也可以在此处修改,然后 通过命令 sysctl -p 生效。

3.8.2 ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,多网卡(IP)绑定相同端口,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。

但是注意,这个参数无法做到让应用绑定完全相同 IP + Port 来重复启动。

3.8.3 ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE 参数对应于套接字选项中的 SO_KEEPALIVE,该参数用于设置 TCP 连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。

3.8.4 ChannelOption.SO_SNDBUF ChannelOption.SO_RCVBUF

  • ChannelOption.SO_SNDBUF 参数对应于套接字选项中的 SO_SNDBUF。
  • ChannelOption.SO_RCVBUF 参数对应于套接字选项中的 SO_RCVBUF。

这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功;发送缓冲区用于保存发送数据,直到发送成功。

3.8.5 ChannelOption.SO_LINGER

ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送。

3.8.6 ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY 参数对应于套接字选项中的 TCP_NODELAY,该参数的使用与 Nagle 算法有关,Nagle 算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用 Nagle 算法,使用于小数据即时传输,与 TCP_NODELAY 相对应的是 TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

3.9 ByteBuf缓冲区类

ByteBuf API 的优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于 JDK 的 StringBuilder);
  • 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化。

ByteBuf 维护了两个不同的索引,名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或者 get 开头的操作则不会。

如果打算读取字节直到 readerIndex 达到和 writerIndex 同样的值时会发生什么。在那时,你将会到达“可以读取的”数据的末尾。就如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个 IndexOutOf-BoundsException。

可以指定 ByteBuf 的最大容量。试图移动写索引(即 writerIndex)超过这个值将会触发一个异常。(默认的限制是 Integer.MAX_VALUE)

3.9.1 使用模式

3.9.1.1 堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中。这种模式被称为支撑数组 (backing array),它能在没有使用池化的情况下提供快速的分配和释放。可以由 hasArray() 来判断检查 ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区。

3.9.1.2 直接缓冲区

直接缓冲区是另外一种 ByteBuf 模式。

直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。但是它的有点就是读写速度更快。

3.9.1.3 复合缓冲区

复合缓冲区 CompositeByteBuf,它为多个 ByteBuf 提供一个聚合视图。比如 HTTP 协议, 分为消息头和消息体,这两部分可能由应用程序的不同模块产生,各有各的 ByteBuf,将会在消息被发送的时候组装为一个 ByteBuf,此时可以将这两个 ByteBuf 聚合为一个 CompositeByteBuf,然后使用统一和通用的 ByteBuf API 来操作。

3.9.2 分配

如何在我们的程序中获得 ByteBuf 的实例,并使用它呢?Netty 提供了两种方式。

3.9.2.1 ByteBufAllocator 接口

Netty 通过 interface ByteBufAllocator 分配我们所描述过的任意类型的 ByteBuf 实例。

名称

描述

buffer()

返回一个基于堆或者直接内存存储的 ByteBuf

heapBuffer()

返回一个基于堆内存存储的 ByteBuf

directBuffer()

返回一个基于直接内存存储的 ByteBuf

compositeBuffer()

返回一个可以通过添加最大到指定数目的基于堆的或者直接内存存储的缓冲区来扩展的 CompositeByteBuf

ioBuffer()

返回一个用于套接字的 I/O 操作的 ByteBuf,当所运行的环境具有 sun.misc.Unsafe 支持时,返回基于直接内存存储的 ByteBuf, 否则返回基于堆内存存储的 ByteBuf;当指定使用PreferHeapByteBufAllocator 时,则只会返回基于堆内存存储的 ByteBuf。

可以通过 Channel(每个都可以有一个不同的 ByteBufAllocator 实例)或者绑定到 ChannelHandler 的 ChannelHandlerContext 获取一个到 ByteBufAllocator 的引用。

Netty 提供了两种 ByteBufAllocator 的实现:PooledByteBufAllocatorUnpooled-ByteBufAllocator。前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化 ByteBuf 实例,并且在每次它被调用时都会返回一个新的实例。

Netty4.1 默认使用了 PooledByteBufAllocator。

3.9.2.2 Unpooled 缓冲区

Netty 提供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。

API

  • buffer() 返回一个未池化的基于堆内存存储的 ByteBuf
  • directBuffer()返回一个未池化的基于直接内存存储的 ByteBuf
  • wrappedBuffer() 返回一个包装了给定数据的 ByteBuf
  • copiedBuffer() 返回一个复制了给定数据的 ByteBuf
  • Unpooled 类还可用于 ByteBuf 同样可用于那些并不需要 Netty 的其他组件的非网络项目。

3.9.3 随机访问索引/顺序访问索引/读写操作

如同在普通的 Java 字节数组中一样,ByteBuf 的索引是从零开始的:第一个字节的索引是 0,最后一个字节的索引总是 capacity() - 1。使用那些需要一个索引值参数(随机访问, 也即是数组下标)的方法(的其中)之一来访问数据既不会改变 readerIndex 也不会改变 writerIndex。如果有需要,也可以通过调用 readerIndex(index)或者 writerIndex(index)来手动移动这两者。顺序访问通过索引访问。

有两种类别(随机访问和顺序访问)的读/写操作:

  • get() 和 set()操作,从给定的索引开始,并且保持索引不变;get+数据字长 (bool,byte,int,short,long,bytes)
  • read() 和 write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。

更多的操作:

  • isReadable():如果至少有一个字节可供读取,则返回 true
  • isWritable():如果至少有一个字节可被写入,则返回 true
  • readableBytes():返回可被读取的字节数 writableBytes() 返回可被写入的字节数
  • capacity():返回 ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直到达到 maxCapacity()
  • maxCapacity():返回 ByteBuf 可以容纳的最大字节数
  • hasArray():如果 ByteBuf 由一个字节数组支撑,则返回 true
  • array():如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个 UnsupportedOperationException 异常

3.9.4 可丢弃字节

可丢弃字节的分段包含了已经被读过的字节。通过调用 discardReadBytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为 0,存储在 readerIndex 中,会随着 read 操作的执行而增加(get*操作不会移动 readerIndex)。

缓冲区上调用 discardReadBytes()方法后,可丢弃字节分段中的空间已经变为可写的了。 频繁地调用 discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会 导致内存复制,因为可读字节必须被移动到缓冲区的开始位置。建议只在有真正需要的时候 才这样做,例如,当内存非常宝贵的时候。

3.9.5 可读字节

ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的 readerIndex 值为 0。

3.9.6 可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的 writerIndex 的默认值为 0。任何名称以 write 开头的操作都将从当前的 writerIndex 处开始 写数据,并将它增加已经写入的字节数。

3.9.7 索引管理

调用 markReaderIndex()、markWriterIndex()、resetWriterIndex()和 resetReaderIndex()来标记和重置 ByteBuf 的 readerIndex 和 writerIndex。

也可以通过调用 readerIndex(int)或者 writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException。

可以通过调用 clear()方法来将 readerIndex 和 writerIndex 都设置为 0。注意,这并不会清除内存中的内容。

3.9.8 查找操作

在 ByteBuf 中有多种可以用来确定指定值的索引的方法。最简单的是使用 indexOf()方法。

较复杂的查找可以通过调用 forEachByte()。

下面的代码展示了一个查找回车符(\r)的例子。

ByteBuf buffer = ...;

int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

 

 

3.9.9 派生缓冲区

派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

  • duplicate();
  • slice();
  • slice(int, int);
  • Unpooled.unmodifiableBuffer(…);
  • order(ByteOrder);
  • readSlice(int)。

每个这些方法都将返回一个新的 ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和 JDK 的 ByteBuffer 一样也是共享的。

ByteBuf 复制如果需要一个现有缓冲区的真实副本,请使用 copy()或者 copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的 ByteBuf 拥有独立的数据副本。

3.9.10 引用计数

引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第 4 版中为 ByteBuf 引入了引用计数技术, interface ReferenceCounted。

3.9.11 工具类

ByteBufUtil 提供了用于操作 ByteBuf 的静态的辅助方法。因为这个 API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。

这些静态方法中最有价值的可能就是 hexdump()方法,它以十六进制的表示形式打印 ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记录 ByteBuf 的内容。 十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六 进制的版本还可以很容易地转换回实际的字节表示。

另一个有用的方法是 boolean equals(ByteBuf, ByteBuf),它被用来判断两个 ByteBuf 实例的相等性。

3.9.12 资源释放

当某个 ChannelInboundHandler 的实现重写 channelRead()方法时,它要负责显式地释放与池化的 ByteBuf 实例相关的内存。Netty 为此提供了一个实用方法 ReferenceCountUtil.release()。

Netty 将使用 WARN 级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用 SimpleChannelInboundHandler,SimpleChannelInboundHandler 会自动释放资源。

  1. 对于入站请求,Netty 的 EventLoo 在处理 Channel 的读操作时进行分配 ByteBuf,对于这类 ByteBuf,需要我们自行进行释放,有三种方式: 或者使用 SimpleChannelInboundHandler; 或者在重写 channelRead()方法使用 ReferenceCountUtil.release() 或者在重写 channelRead()方法使用使用 ctx.fireChannelRead 继续向后传递;
  2. 对于出站请求,不管 ByteBuf 是否由我们的业务创建的,当调用了 write 或者 writeAndFlush 方法后,Netty 会自动替我们释放,不需要我们业务代码自行释放。

四、解决粘包和半包

TCP的粘包/半包(拆包)是面试中非常高频的问题。

4.1 什么是 TCP 粘包和半包?

假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下5种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,D1 和 D2 粘合在一起,被称为 TCP 粘包
  3. 服务端分两次读取到了两个数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这被称为 TCP 拆包
  4. 服务端分两次读取到了两个数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余内容 D1_2 和 D2 包的整包。
  5. 如果此时服务端 TCP 接收滑窗非常小(也就是服务端每次能接受的一个报文大小非常小),而数据包 D1 和 D2 比较大,很有可能会发生第五种可能,即服务端分多次才能将 D1 和 D2 包接收完全,期间发生多次拆包。

4.2 TCP 粘包/半包发生的原因

4.2.1 TCP粘包的出现原因

由于 TCP 协议本身的机制(面向连接的可靠地协议——三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么它本身会启用 Nagle 算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP 的网络延迟要 UDP 的高些)然后再发送(超时或者包大小足够就会发送)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象。

UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),它不会对数据包进行合并发送(也就没有 Nagle 算法之说了),它直接是一端发送什么数据,直接就发出去了,既然它不会对数据合并,每一个数据包都是完整的(数据+UDP 头+IP 头等等发一次数据封装一次)也就没有粘包一说了。

4.2.2 TCP半包的出现原因

分包(半包、拆包)产生的原因就简单的多:就是一个数据包被分成了多次接收。

更具体的原因至少包括:

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  2. 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是 TCP 报文段的最大长度,而是:MSS=TCP 报文段长度-TCP 首部长度

4.3 解决粘包/半包的方案

由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案, 可以归纳如下:

  1. 在包尾增加分割符,比如回车换行符进行分割,例如 FTP 协议; 参见1、回车换行符进行分割(回车换行符进行分割)和 2、自定义分割符进行分割(自定义分割符)下的代码。
  2. 消息定长,例如每个报文的大小为固定长度 200 字节,如果不够,空位补空格; 参见 方案二:消息定长 下的代码。
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度) 的字段,通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度,使用 LengthFieldBasedFrameDecoder,后面会有详细说明和使用:6.3.2.1 LengthFieldBasedFrame 详解。

方案一:在包尾增加分割符

1、回车换行符进行分割

客户端:

/**
 * 客户端类
 */
public class LineBaseEchoClient {
    private final String host;
    public LineBaseEchoClient(String host) {
        this.host = host;
    }
    public void start() throws InterruptedException {、
        /*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /*客户端启动必须*/
            final Bootstrap b = new Bootstrap();
            b.group(group)/*将线程组传入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
                    .handler(new ChannelInitializerImp()); // 添加ChannelInitializer这个Handler,用于批量添加Handler到pipeline中
            
            // 向服务端发起连接请求
            ChannelFuture f = b.connect().sync();
            System.out.println("已连接到服务器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
    /**
     * 批量向pipeline中添加Handler
     */
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 要想使用回车换行符进行分割,就要添加这个Handler
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseClientHandler());
        }
    }
    /**
     * 启动客户端
     */
    public static void main(String[] args) throws InterruptedException {
        // 启动客户端
        new LineBaseEchoClient("127.0.0.1").start();
    }
}


/**
 * 客户端Handler类
 */
public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
        //ctx.close();
    }
    /*** 客户端被通知channel活跃后会触发该方法
     * 这里就是开始发送数据
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,zhuge,zhouyu,fox,loulan"
                + System.getProperty("line.separator"); // 用回车分割每一条消息。这是一个内置的回车符
        // 发送10次消息
        for(int i=0;i<10;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }
    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端:

/**
 * 服务端类
 */
public class LineBaseEchoServer {

    public static final int PORT = 9998;

    /**
     * 启动服务端
     */
    public static void main(String[] args) throws InterruptedException {
        LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
        System.out.println("服务器即将启动");
        // 启动服务端
        lineBaseEchoServer.start();
    }

    public void start() throws InterruptedException {
        final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
            b.group(group)/*将线程组传入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                所以下面这段代码的作用就是为这个子channel增加handle*/
                .childHandler(new ChannelInitializerImp());
            // 绑定要监听的端口号    
            ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
            System.out.println("服务器启动完成,等待客户端的连接和数据.....");
            f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
        } finally {
            group.shutdownGracefully().sync();/*优雅关闭线程组*/
        }
    }

    /**
     * 批量增加Handler
     */
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 要想使用回车换行符进行分割,就要添加这个Handler
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }
}

/**
 * 服务端Handler类
 * 类说明:服务端的业务处理
 */
@ChannelHandler.Sharable
public class LineBaseServerHandler extends ChannelInboundHandlerAdapter {
    private AtomicInteger counter = new AtomicInteger(0);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端:["+ctx.channel().remoteAddress()+"]已连接.........");
    }

    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        // 因为添加了LineBasedFrameDecoder和LineBaseClientHandler,就会自动将消息按照回车分隔
        String request = in.toString(CharsetUtil.UTF_8);
        // 输出这是收到的第几次消息
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    /*** 服务端读取完成网络数据后的处理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+"即将关闭...");
    }
}

2、自定义分割符进行分割

客户端:

/**
 * 客户端类
 */
public class DelimiterEchoClient {
    private final String host;

    public DelimiterEchoClient(String host) {
        this.host = host;
    }

    public void start() throws InterruptedException {
        /*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /*客户端启动必须*/
            final Bootstrap b = new Bootstrap();
            b.group(group)/*将线程组传入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已连接到服务器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // DelimiterEchoServer.DELIMITER_SYMBOL是我们在服务端类中定义的自定义分隔符
            // 我们需要先把自定义的分隔符转换为ByteBuf
            ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL.getBytes());
            // 使用自定义分隔符就需要添加DelimiterBasedFrameDecoder这个Handler,并且要向这个Handler传入我们的自定义分隔符
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
            ch.pipeline().addLast(new DelimiterClientHandler());
        }
    }

    /**
     * 启动客户端
     */
    public static void main(String[] args) throws InterruptedException {
        new DelimiterEchoClient("127.0.0.1").start();
    }
}


/**
 * 客户端Handler类
 */
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }
    /*** 客户端被通知channel活跃后,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,zhuge,zhouyu,fox,loulan"
                + DelimiterEchoServer.DELIMITER_SYMBOL; // 自定义分隔符
        for(int i=0;i<10;i++){
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
            System.out.println("发送数据到服务器");
        }
    }
    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端:

/**
 * 服务端类
 */
public class DelimiterEchoServer {
    // 自定义分隔符
    public static final String DELIMITER_SYMBOL = "@~";

    public static final int PORT = 9997;

    /**
     * 启动服务端
     */
    public static void main(String[] args) throws InterruptedException {
        DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
        System.out.println("服务器即将启动");
        delimiterEchoServer.start();
    }

    public void start() throws InterruptedException {
        final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
        /*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /*服务端启动必须*/
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)/*将线程组传入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                所以下面这段代码的作用就是为这个子channel增加handle*/
                .childHandler(new ChannelInitializerImp());
            /*异步绑定到服务器,sync()会阻塞直到完成*/
            ChannelFuture f = b.bind().sync();
            System.out.println("服务器启动完成,等待客户端的连接和数据.....");
            /*阻塞直到服务器的channel关闭*/
            f.channel().closeFuture().sync();
        } finally {
            /*优雅关闭线程组*/
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 将自定义分隔符转换为ByteBuf
            ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL.getBytes());
            // 使用自定义分隔符就需要添加DelimiterBasedFrameDecoder这个Handler,并且要向这个Handler传入我们的自定义分隔符
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }
}


/**
 * 服务端Handler类
 * 类说明:自己的业务处理
 */
// 将该Handler设置成了共享Handler
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        String request = in.toString(CharsetUtil.UTF_8);
        // 输出这是收到的第几次消息
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        // 输出收到的消息,自动按照自定义分隔符进行分割
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + DelimiterEchoServer.DELIMITER_SYMBOL;
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
        //ctx.close();
    }
    /*** 服务端读取完成网络数据后的处理*/
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelReadComplete------");
        //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }
    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

方案二:消息定长

客户端:

/**
 * 客户端类
 */
public class FixedLengthEchoClient {
    // 要发送的数据内容
    public final static String REQUEST = "Mark,zhuge,zhouyu,fox,loulan";
    private final String host;

    public FixedLengthEchoClient(String host) {
        this.host = host;
    }

    public void start() throws InterruptedException {
        /*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /*客户端启动必须*/
            final Bootstrap b = new Bootstrap();
            b.group(group)/*将线程组传入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
                    .remoteAddress(new InetSocketAddress(host,FixedLengthEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
                    .handler(new ChannelInitializerImp());
            // 向服务端发起连接请求
            ChannelFuture f = b.connect().sync();
            System.out.println("已连接到服务器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    /**
     * 批量添加Handler
     */
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 要想开启定长报文,就要添加FixedLengthFrameDecoder这个Handler
            // 还要传入我们要定长的长度,这里我们就用要发送消息的长度作为定长长度(FixedLengthEchoServer.RESPONSE.length())
            ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoServer.RESPONSE.length()));
            // 添加服务端的业务Handler
            ch.pipeline().addLast(new FixedLengthClientHandler());
        }
    }

    /**
     * 启动客户端
     */
    public static void main(String[] args) throws InterruptedException {
        new FixedLengthEchoClient("127.0.0.1").start();
    }
}


/**
 * 客户端Handler类
 */
public class FixedLengthClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客户端读取到网络数据后的处理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }

    /*** 客户端被通知channel活跃后出发执行。这里就是发送消息*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        // 发送十次数据
        for(int i=0;i<10;i++){
            // 利用Unpooled创建固定长度的ByteBuf消息对象
            msg = Unpooled.buffer(FixedLengthEchoClient.REQUEST.length());
            // 向msg中写入要发送的消息
            msg.writeBytes(FixedLengthEchoClient.REQUEST.getBytes());
            // 发送消息
            ctx.writeAndFlush(msg);
        }
    }

    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端:

/**
 * 服务端类
 */
public class FixedLengthEchoServer {
    // 要返回给客户端的消息
    public static final String RESPONSE = "Welcome to Netty!";
    public static final int PORT = 9996;

    /**
     * 启动服务端
     */
    public static void main(String[] args) throws InterruptedException {
        FixedLengthEchoServer fixedLengthEchoServer = new FixedLengthEchoServer();
        System.out.println("服务器即将启动");
        fixedLengthEchoServer.start();
    }

    public void start() throws InterruptedException {
        final FixedLengthServerHandler serverHandler = new FixedLengthServerHandler();
        /*线程组*/
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            /*服务端启动必须*/
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)/*将线程组传入*/
                .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
                .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
                /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
                所以下面这段代码的作用就是为这个子channel增加handle*/
                .childHandler(new ChannelInitializerImp());
            /*异步绑定到服务器,sync()会阻塞直到完成*/    
            ChannelFuture f = b.bind().sync();
            System.out.println("服务器启动完成,等待客户端的连接和数据.....");
            /*阻塞直到服务器的channel关闭*/
            f.channel().closeFuture().sync();
        } finally {
            /*优雅关闭线程组*/
            group.shutdownGracefully().sync();
        }
    }

    /**
     * 批量加入Handler
     */
    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            // 添加定长消息Handler,并且要传入定长的长度。这里传入的就是客户端发来消息的长度
            ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));
            // 添加服务端业务Handler
            ch.pipeline().addLast(new FixedLengthServerHandler());
        }
    }
}

/**
 * 服务端Handler
 */
// 这里设置为了共享Handler
@ChannelHandler.Sharable
public class FixedLengthServerHandler extends ChannelInboundHandlerAdapter {
    private AtomicInteger counter = new AtomicInteger(0);
    /*** 服务端读取到网络数据后的处理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        // 自动获取到定长数据
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        // 返回给客户端响应消息        
        ctx.writeAndFlush(Unpooled.copiedBuffer(FixedLengthEchoServer.RESPONSE.getBytes()));
    }
    /*** 发生异常后的处理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

方案三:消息头中包含表示消息总长度(或者消息体长度) 的字段

这个方案的示例代码在后面的章节会进行讲解。

4.4 辨析 channelRead channelReadComplete

这两个方法都是入站用的。

两者的区别:

  • Netty 是在读到完整的业务请求报文后才调用一次业务 ChannelHandler 的 channelRead 方法,无论这条报文底层经过了几次 SocketChannel 的 read 调用。
  • 但是 channelReadComplete 方法并不是在业务语义上的读取消息完成后被触发的,而是在每次从 SocketChannel 成功读到消息后,由系统触发,也就是说如果一个业务消息被 TCP 协议栈发送了 N 次,则服务端的 channelReadComplete 方法就会被调用 N 次。

我们用代码来验证一下:

在客户端发送 5 次较小报文

服务端输出:

很明显,channelRead 是一个报文执行一次,执行的执行次数和客户端发送报文数一样(因为只有当获取到一个完整报文才会触发一次channelRead);channelReadComplete 虽然也执行了多次,但是和客户端发送报文数没什么关系,而且也没什么规律可寻(因为TCP有可能将报文合并或者拆分发送,TCP发送一次数据才触发一次channelReadComplete,并不一定是有多少个报文数,就发送几次数据,有可能将多个报文合并在一起发送或者拆分成多次发送)。

五、编解码器框架

5.1 什么是编解码器

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列——它的数据。 那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流。在网络传输的过程中,不管什么消息基本都是以01二进制数据的形式进行传输的);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。我们前面所学的解决粘包半包的方案其实也是编解码器框架的一部分。

5.2 解码器

主要有两种解码器类型:

  • 将字节解码为消息——ByteToMessageDecoder
  • 将一种消息类型解码为另一种——MessageToMessageDecoder

因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以 Netty 的解码器实现了对应的ChannelInboundHandler。

什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 ChannelInboundHandler 转换入站数据时会用到。此外,得益于 ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。

比如一个实际的业务场景,两端通信,通过 JSON 交换信息,而且 JSON 文本需要加密, 接收端就可以:

网络加密报文 -> 经过 ByteToMessageDecoder -> String 类型的 JSON 明文-> 经过MessageToMessageDecoder -> Java 里的对象

所以我们可以把 ByteToMessageDecoder 看成一次解码器,MessageToMessageDecoder 看成二次或者多次解码器。

5.2.1 将字节数据解码为消息

抽象类 ByteToMessageDecoder

将字节数据解码为消息(或者另一个字节序列)是一项如此常见的任务,Netty为它提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。

它最重要方法:

decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out)

该方法是必须实现的唯一抽象方法。decode()方法被调用时将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码后消息的 List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者该 ByteBuf 中没有更多可读取的字节时为止。然后,如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。

5.2.2 将一种消息类型解码为另一种

MessageToMessageDecoder<T>,T 代表源数据的类型

在两个消息格式之间进行转换(例如,从 String->Integer)

// I表示的就是数据源类性

decode(ChannelHandlerContext ctx,I msg,List<Object> out)

 

对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给 ChannelPipeline 中的下一个 ChannelInboundHandler。

5.2.3 TooLongFrameException

由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。但是又不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

为了避免这种情况,你可以设置一个最大缓冲字节数的阈值,如果超出该阈值,则会导致抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。

5.3 编码器

解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:

  • 将消息编码为字节:MessageToByteEncoder<I>
  • 将一种类型的消息编码为另一种类型的消息:MessageToMessageEncoder<T>,T 代表源数据的类型

还是用我们上面的业务场景,两端通信,通过 JSON 交换信息,而且 JSON 文本需要加密,发送端就可以:

Java 里的对象-> 经过 MessageToMessageEncoder -> String 类型的 JSON 文本 -> 经过MessageToByteEncoder-> 网络加密报文;

所以我们可以把 MessageToByteEncoder 看成网络报文编码器, MessageToMessageEncoder 看成业务编码器。

5.3.1 将消息编码为字节数据

encode(ChannelHandlerContext ctx,I msg,ByteBuf out)

encode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为 ByteBuf 的出站消息(类型为 I 的)。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下 一个ChannelOutboundHandler。

5.3.2 将一种消息编码为另一种消息

encode(ChannelHandlerContext ctx,I msg,List<Object> out)

这是需要实现的唯一方法。每个通过 write()方法写入的消息都将会被传递给 encode() 方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler。

5.4 编解码器类

我们一直将解码器和编码器作为单独的实体讨论,但是有时在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器-编码器对。这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。

为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢?因为通过尽可能地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是 Netty 设计的一个基本原则。

相关的类:

  • 抽象类 ByteToMessageCodec
  • 抽象类 MessageToMessageCodec

通过继承抽象类 ByteToMessageCodec ,就可以做到即解码,又编码。

同样的这个抽象类MessageToMessageCodec,也可以做到相同的效果。

5.5 实战:实现 SSL/TLS Web 服务

Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了我们花费的时间与精力。

5.5.1 通过 SSL/TLS 保护 Netty 应用程序

SSL 和 TLS 这样的安全协议,它们层叠在其他协议之上,用以实现数据安全。我们在访问安全网站时遇到过这些协议,但是它们也可用于其他不是基于 HTTP 的应用程序,如安全 SMTP(SMTPS)邮件服务器甚至是关系型数据库系统。

为了支持 SSL/TLS,Java 提供了 javax.net.ssl 包,它的 SSLContext 和 SSLEngine 类使得实现解密和加密相当简单直接。Netty 通过一个名为 SslHandler 的 ChannelHandler 实现利用了这个 API,其中 SslHandler 在内部使用 SSLEngine 来完成实际的工作。

在大多数情况下,SslHandler 将是 ChannelPipeline 中的第一个 ChannelHandler。

5.5.2 HTTP 系列

HTTP 是基于请求/响应模式的:客户端向服务器发送一个 HTTP 请求,然后服务器将会返回一个 HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。

一个HTTP 请求/响应可能由多个数据部分组成,FullHttpRequest 和 FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest、LastHttpContent 等等)都实现了 HttpObject 接口。

  • HttpRequestEncoder:将 HttpRequest、HttpContent 和 LastHttpContent 消息编码为字节
  • HttpResponseEncoder:将 HttpResponse、HttpContent 和 LastHttpContent 消息编码为字节
  • HttpRequestDecoder:将字节解码为 HttpRequest、HttpContent 和 LastHttpContent 消息
  • HttpResponseDecoder:将字节解码为 HttpResponse、HttpContent 和 LastHttpContent 消息
  • HttpClientCodec 和 HttpServerCodec 则将请求和响应做了一个组合。

5.5.2.1 聚合 HTTP 消息

由于 HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。 为了消除这项繁琐的任务,Netty 提供了一个聚合器 HttpObjectAggregator,它可以将多个消息部分合并为 FullHttpRequest 或者 FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容。

5.5.2.2 HTTP 压缩

当使用 HTTP 时,建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带来一些 CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说。Netty 为压缩和解压缩提供了 ChannelHandler 实现,它们同时支持 gzip 和 deflate 编码。

5.5.2.3 使用 HTTPS

启用 HTTPS 只需要将 SslHandler 添加到 ChannelPipeline 的 ChannelHandler 组合中。

5.5.3 实现步骤

  1. 首先实现 Http 服务器并浏览器访问;
  2. 增加 SSL 控制;
  3. 实现客户端并访问。

通过OptionalHandler实现根据客户端是否使用https访问来决定服务端是否接收https请求。

六、序列化问题

Java 序列化的目的主要有两个:

  1. 网络传输。网络传输的数据肯定要序列化,因为数据在网络传输的过程中,一定是以01格式传输的,这样传输效率更高。
  2. 对象持久化

当进行远程跨迸程服务调用时,需要把被传输的 Java 对象编码为字节数组或者 ByteBuffer 对象。而当远程服务读取到 ByteBuffer 对象或者字节数组时,需要将其解码为发送时的 Java 对象。这被称为 Java 对象编解码技术

Java 序列化仅仅是 Java 编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架。

6.1 Java 序列化的缺点

Java 序列化从 JDK1.1 版本就已经提供,它不需要添加额外的类库,只需实现 java.io.Serializable 并生成序列 ID 即可,因此,它从诞生之初就得到了广泛的应用。

但是在远程服务调用(RPC)时,很少直接使用 Java 序列化进行消息的编解码和传输, 这又是什么原因呢?下面通过分析Java序列化的缺点来找出答案。

1、无法跨语言

对于跨进程的服务调用,服务提供者可能会使用 C++ 或者其他语言开发,当我们需要和异构语言进程交互时 Java 序列化就难以胜任。由于 Java 序列化技术是 Java 语言内部的私有协议,其他语言并不支持,对于用户来说它完全是黑盒。对于 Java 序列化后的字节数组, 别的语言无法进行反序列化,这就严重阻碍了它的应用。

2、序列化后的码流太大

通过一个实例看下 Java 序列化后的字节数组大小。

空间(单位字节)

上面是JDK对一个Java对象进行序列化后的大小,下面是自己实现的一个简单序列化后的大小。很明显JDK自带的序列化后的空间占用还是很大。

3、序列化性能太低

无论是序列化后的码流大小,还是序列化的性能,JDK 默认的序列化机制表现得都很差。 因此,我们边常不会选择 Java 序列化作为远程跨节点调用的编解码框架。

时间

上面是JDK序列化的耗时,下面是自己实现的一个简单序列化的耗时。

通过上面的分析,可以看到JDK实现的序列化空间和时间性能全面落后于自己随便实现的一个序列化。所以现在很少有直接使用JDK序列化的 Serializable。用它唯一的好处也就是JDK原生提供,不需要很多其他的类库。

6.2 如何选择序列化框架

6.2.1 选择四要点

  1. 是否需要跨语言的支持
  2. 空间:编码后占用空间
  3. 时间:编解码速度
  4. 是否追求可读性

如果项目里有跨语言支持的硬性要求,某种序列化框架只支持特定语言,即使它比其他的框架快1万倍,也没法选择。

空间和时间其实是对序列化框架的性能要求,这两者其实是存在矛盾的,想要编码后占用空间小,自然就要花费更多的时间去编码,所以这两者往往要追求一种平衡。

有些项目里还要求序列化后的数据是人类可读的,这个时候的选择就不多了,一般是 JSON 格式或者 XML 格式,有些序列化框架也支持通过自带工具观察序列化后的数据,也可以考虑选择。

6.2.1 序列化框架比较

我们可以借鉴阿里技术官方的分析结果 (几种Java常用序列化框架的选型与对比-阿里云开发者社区

从跨语言来说:

还有一种没体现在上面的 msgpack(https://msgpack.org/)也是支持跨语言的。

从性能上来说:

从空间性能来看:

avro、kryo、Hessian2、fst、Protocol buffer 表现都不错;

从时间性能来看:

kryo、fst、Protocol buffer 表现也很好

Msgpack 也是一种很优秀的序列化框架,性能和 Protocol buffer 不相上下。

6.3 LengthFieldBasedFrame 详解

LengthFieldBasedFrame可以使报文中带上长度,这个可以用来解决粘包半包问题。

该类主要的参数有:

  • maxFrameLength:表示的是包的最大长度,
  • lengthFieldOffset:指的是长度域的偏移量,表示跳过指定个数字节之后的才是长度域;表示包长度的字段并不一定在报文的最前面,可能在中间的任意位置,要用偏移量来表示它的所在位置。
  • lengthFieldLength:记录该帧数据长度的字段,也就是长度域本身的长度;
  • lengthAdjustment:长度的一个修正值,可正可负,Netty 在读取到数据包的长度值 N 后, 认为接下来的 N 个字节都是需要读取的,但是根据实际情况,有可能需要增加 N 的值,也 有可能需要减少 N 的值,具体增加多少,减少多少,写在这个参数里;
  • initialBytesToStrip:从数据帧中跳过的字节数,表示得到一个完整的数据包之后,扔掉 这个数据包中多少字节数,才是后续业务实际需要的业务数据。
  • failFast:如果为 true,则表示读取到长度域,TA 的值的超过 maxFrameLength,就抛出 一个 TooLongFrameException,而为 false 表示只有当真正读取完长度域的值表示的字节之后, 才会抛出 TooLongFrameException,默认情况下设置为 true,建议不要修改,否则可能会造成内存溢出。

下面我们有几个例子来带大家深入理解这些参数。下面说的参数顺序就是按照这个来的。

绿色是业务数据,黄色是长度字段。一个方格就是一个字节。

情况一:数据包大小: 14B = 长度域 2B + "HELLO, WORLD"(单词 HELLO+一个逗号+一个空格+单 词 WORLD)

要求传给后面的应用的时候,原封不动地将数据传过去。

长度域的值为 12B(0x000c)。希望解码后保持一样,根据上面的公式,参数应该为:

  1. lengthFieldOffset = 0
  2. lengthFieldLength = 2
  3. lengthAdjustment 无需调整,上面的例子就为0
  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

情况二:数据包大小: 14B = 长度域 2B + "HELLO, WORLD"

要求传给后面应用的时候把记录长度的部分去除掉

长度域的值为 12B(0x000c)。解码后,希望丢弃长度域 2B 字段,所以,只要 initialBytesToStrip = 2 即可。

  1. lengthFieldOffset = 0
  2. lengthFieldLength = 2
  3. lengthAdjustment 无需调整,上面的例子就是0
  4. initialBytesToStrip = 2 解码过程中,丢弃 2 个字节的数据

情况三:数据包大小: 14B = 长度域 2B + "HELLO, WORLD"。长度域的值为 14(0x000E)

这个情况前面的长度变成了0E,十进制就是14,所以很明显,前面的0c=12表示的是数据的长度,但是现在0E表示的是整个数据包的长度。

长度域的值为 14(0x000E),包含了长度域本身的长度。希望解码后保持一样,根据上面 的公式,参数应该为:

  1. lengthFieldOffset = 0
  2. lengthFieldLength = 2
  3. lengthAdjustment = -2 因为长度域为 14,而报文内容为 12,为了防止读取报文超出 报文本体,和将长度字段一起读取进来,需要告诉 netty,实际读取的报文长度比长度域中 的要少 2(12-14=-2)
  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

情况四:在长度域前添加 2 个字节的 Header。长度域的值(0x00000C) = 12。总数据包长度: 17=Header(2B) + 长度域(3B) + "HELLO, WORLD"

这个报文最前面蓝色的部分称为“魔数”,一般是用于标识协议类型的。

长度域的值为 12B(0x000c)。编码解码后,长度保持一致,所以 initialBytesToStrip = 0。 参数应该为:

  1. lengthFieldOffset = 2
  2. lengthFieldLength = 3
  3. lengthAdjustment = 0 无需调整
  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

情况五:Header 与长度域的位置换了。总数据包长度: 17=长度域(3B) + Header(2B) + "HELLO, WORLD"

这个长度字段跑到了最前面,魔数字段放在了长度字段的后面。

长度域的值为 12B(0x000c)。编码解码后,长度保持一致,所以 initialBytesToStrip = 0。 参数应该为:

  1. lengthFieldOffset = 0
  2. lengthFieldLength = 3
  3. lengthAdjustment = 2 因为长度域为 12,而报文内容为 12,但是我们需要把 Header 的值一起读取进来,需要告诉 netty,实际读取的报文内容长度比长度域中的要多 2(12+2=14)
  4. initialBytesToStrip = 0 - 解码过程中,没有丢弃任何数据

情况六:带有两个 header。HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体,这种 协议中,一般 HDR1 可以表示 magicNumber,表示应用只接受以该 magicNumber 开头的二 进制数据,rpc里面用的比较多。总数据包长度: 16=HDR1(1B)+长度域(2B) +HDR2(1B) + "HELLO, WORLD"

长度字段左右两边有了魔数字段,传递给后面的应用时要求把首部和长度字段去掉,其余的传到后面去。

长度域的值为 12B(0x000c)

  1. lengthFieldOffset = 1 (HDR1 的长度)
  2. lengthFieldLength = 2
  3. lengthAdjustment =1 因为长度域为 12,而报文内容为 12,但是我们需要把 HDR2 的 值一起读取进来,需要告诉 netty,实际读取的报文内容长度比长度域中的要多 1(12+1=13)
  4. initialBytesToStrip = 3 丢弃了 HDR1 和长度字段(因为要去掉前三个字节,所以这里要写3)

情况七:带有两个 header,HDR1 丢弃,长度域丢弃,只剩下第二个 header 和有效包体。总数 据包长度: 16=HDR1(1B)+长度域(2B) +HDR2(1B) + "HELLO, WORLD"

这个的长度字段时10,转换为十进制就是16。同样要把前三个字节去掉。

  1. 长度域的值为 16B(0x0010),长度为 2,HDR1 的长度为 1,HDR2 的长度为 1,包体的长 度为 12,1+1+2+12=16。 lengthFieldOffset = 1
  2. lengthFieldLength = 2
  3. lengthAdjustment = -3 因为长度域为 16,需要告诉 netty,实际读取的报文内容长度比 长度域中的要 少 3(13-16= -3)
  4. initialBytesToStrip = 3 丢弃了 HDR1 和长度字段

相关推荐

  1. React---函数组件hook

    2024-03-19 22:56:03       39 阅读
  2. Optional详解API

    2024-03-19 22:56:03       8 阅读
  3. 深入解析Spring Boot注解组件(上)

    2024-03-19 22:56:03       7 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-19 22:56:03       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-19 22:56:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-19 22:56:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-19 22:56:03       20 阅读

热门阅读

  1. el-input添加keyup事件无响应

    2024-03-19 22:56:03       18 阅读
  2. 掘根宝典之c++标识符,命名

    2024-03-19 22:56:03       21 阅读
  3. 爬虫基本原理实现以及问题解决

    2024-03-19 22:56:03       21 阅读
  4. 系统架构设计师笔记第37期:数据访问层设计

    2024-03-19 22:56:03       16 阅读
  5. PyTorch学习笔记之基础函数篇(十二)

    2024-03-19 22:56:03       16 阅读
  6. [LLM]大模型八股知识点(一)

    2024-03-19 22:56:03       17 阅读
  7. 常见的几个Python技术难题

    2024-03-19 22:56:03       17 阅读