初识Netty
概述
为什么要使用Netty
我们在使⽤NIO做同步⾮阻塞式的⽹络通信时,会发现NIO实现起来⾮常复杂。整个NIO的过程包含着繁琐的步骤,且需要同时掌握好各种API的使⽤,⽐如Selector、ServerSocketChannel、SocketChannel及ByteBuffer等。
从之前的几篇关于NIO的文章可以看出,还没有实现业务,光写整个流程就⾮常繁琐。NIO除了实现起来复杂之外,还存在⼀些需要解决的棘⼿问题,⽐如客户端断线重连如何实现,⼼跳处理、半包读写处理等等⼀些列问题,此时需要有这么⼀个框架,⽤于解决和优化NIO存在的问题,它就是Netty。
Netty的应用场景
Netty的应⽤场景⾮常⼴泛,在各⼤领域都需要Netty强有⼒的⽀撑,⽐如⼈⼯智能物联⽹(AIOT)平台、互联⽹应⽤、游戏⾏业和⼤数据领域等。
⼈⼯智能物联⽹(AIOT)平台
在物联⽹的世界⾥,将物体连上⽹络,以充分发挥物体在互联⽹中的主体功能。这也是物联⽹存在的具体表象。当某⼀个物体连上云端以后,物体的操作维度和复杂度就⼤⼤提升了。这样的例⼦有很多,⽐如⻋联⽹、传感器联⽹、各种硬件设备联⽹等等,通过搭配不同的应⽤场景,可以实现不同的需求。此时需要解决⼀个棘⼿的问题,在AIOT平台中,硬件设备与平台的通信是⾮常频繁的,换句话物联⽹的场景中数据量是⾮常庞⼤的,往往就是⽇均TB级别的数据量。这样的数据量是因为硬件设备与平台频繁通信造成的。此时会存在⼀个问题,如果硬件设备具备⼀定的数量,那么平台就有可能被⼤量硬件设备的极度频繁的通信⽽打爆。⽤⼀个更形象的数据来形容,⽐如⼀个硬件设备为了维持⼼跳数据,每隔5秒要和平台进⾏⼀次通信,那么在1万台设备的规模下,平台就需要接受每秒2000的并发请求。此时Netty可以很好的解决这样的频繁通信IO问题。
互联网行业
在微服务世界⾥,服务被拆分后,服务变得越来越多,服务和服务之间通信变得⾮常频繁。此时RPC框架解决了服务之间通信问题,⽽⽬前典型的RPC框架就是阿⾥的分布式服务框架Dubbo。Dubbo框架实现的RPC,其默认使⽤的Dubbo协议,⽽Dubbo协议的底层就是将Netty作为基础通信组件。并且,消息队列RocketMQ的底层也是⽤Netty作为基础通信组件。
游戏行业及大数据领域
Netty作为⾼性能通信组件,其提供的TCP/UDP和HTTP协议栈,有效⽀撑了⼿游及⼤型⽹游的服务端通信应⽤。在⼤数据领域,经典的Hadoop⾼性能通信和序列化组件Avro的RPC框架,默认采⽤的是Netty实现跨界点通信。
Netty初体验
Netty具备设计优雅、使⽤⽅便、性能强劲等优点,下⾯我们就快速体验⼀下Netty。
引入依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.35.Final</version> </dependency>
服务端
package com.my.netty.start; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @author zhupanlin * @version 1.0 * @description: Netty服务端 * @date 2024/1/28 9:33 */ public class NettyServer { public static void main(String[] args) { // 创建只处理客户端连接请求的线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建只处理客户端读写业务的线程组,参数默认值是CPU核心数的两倍 EventLoopGroup workGroup = new NioEventLoopGroup(10); // 创建服务端启动对象 ServerBootstrap bootstrap = new ServerBootstrap(); try { // 配置参数 bootstrap.group(bossGroup, workGroup) // 配置group // 使用NioServerSocketChannel作为服务器的通道实现 .channel(NioServerSocketChannel.class) // 配置用于存放因没有空闲线程导致连接请求被暂存放到队列中的队列长度 .option(ChannelOption.SO_BACKLOG, 1024) // 创建通道初始化的对象并配置该对象,向该对象中添加处理器来实现具体的业务处理 .childHandler(new ChannelInitializer<SocketChannel>() { // 初始化通道 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加处理器,处理器里面是真正处理业务的 socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("Netty服务端启动了"); // sync()同步阻塞地启动服务端,否则异步 //ChannelFuture channelFuture = bootstrap.bind(9090).sync(); ChannelFuture channelFuture = bootstrap.bind(9090); // 服务启动成功就会被调用 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("监听9090端口成功了"); }else { System.out.println("监听9090端口失败了"); } } }); // 只要服务没关闭,该方法会一直阻塞 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { // 断开所有连接并清理内存 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
服务端处理读写业务的Handler
package com.my.netty.start; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; /** * @author zhupanlin * @version 1.0 * @description: 自定义Handler 实现Netty处理的规范 * @date 2024/1/28 9:55 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 当有客户端发送数据来时该方法就会被调用 * @param ctx 上下文对象 含有Channel和pipeline的上下文对象 * @param msg 客户端发送来的数据 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送的数据:" + buf.toString(StandardCharsets.UTF_8)); } /** * 读完数据后调用的方法:发送数据给客户端 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 创建携带数据的ByteBuf对象 ByteBuf buf = Unpooled.copiedBuffer("hello netty client".getBytes()); // 把数据写入到通道中 ctx.writeAndFlush(buf); } /** * 异常捕获,当出现异常的时候会调用的方法 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.getMessage()); // 关闭通道 ctx.close(); } }
客户端
package com.my.netty.start; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @author zhupanlin * @version 1.0 * @description: Netty客户端 * @date 2024/1/28 10:45 */ public class NettyClient { public static void main(String[] args) { // 创建一个线程组用于事件循环 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { // 创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); // 设置相关参数 bootstrap.group(eventLoopGroup) // 使用NioSocketCHannel作为客户端的通道实现 .channel(NioSocketChannel.class) // 创建通道初始化对象并设置handler业务处理器 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加处理器,处理器里面是实现具体业务的 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("Netty客户端启动了"); // 同步阻塞地告知连接的服务器的地址,并启动客户端 ChannelFuture channelFuture = bootstrap.connect("localhost", 9090).sync(); // 阻塞等待完成操作后关闭通道 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { eventLoopGroup.shutdownGracefully(); } } }
客户端处理读写业务的handler
package com.my.netty.start; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.StandardCharsets; /** * @author zhupanlin * @version 1.0 * @description: TODO * @date 2024/1/28 10:51 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 当客户端完成连接服务器后调用该方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes(StandardCharsets.UTF_8)); ctx.writeAndFlush(buf); } /** * 当通道有读事件发生时调用的方法:读取服务器返回的数据 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("来自服务器" + ctx.channel().remoteAddress() + "的消息:" + buf.toString(StandardCharsets.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }