netty粘包问题分析

Netty的粘包问题

在网络通信中,粘包问题是一种常见的情况,特别是在使用TCP协议进行数据传输时。本文将探讨Netty中的粘包问题及其解决方案,帮助读者更好地理解和应对这一问题。

1. 粘包问题介绍
1.1 什么是粘包问题

粘包问题是指发送端在将多个数据包连续发送到接收端时,接收端在接收数据时可能会将多个数据包粘合在一起,导致数据解析错误或丢失的情况。

1.2 粘包问题的影响

粘包问题可能导致接收端无法准确解析数据,从而影响系统的正常运行,甚至引发数据错误或丢失。

2. 粘包问题的原因
2.1 TCP协议的特点

TCP协议是面向连接的、可靠的数据传输协议,其特性会导致粘包问题的产生。

2.2 发送端造成的粘包

发送端在短时间内连续发送多个数据包时,可能会导致多个数据包在传输过程中被合并成一个数据包。

2.3 接收端造成的粘包

接收端在接收数据时,可能会将多个数据包一次性读取到缓冲区,造成粘包现象。

3. 粘包问题的解决方案
3.1 固定长度消息

通过设置固定长度的消息格式,使得每个数据包的长度固定,从而解决粘包问题。
服务端代码示例

ch.pipeline().addLast(new FixedLengthFrameDecoder(16));

服务端完整代码

// 示例代码
/**
 * @description: 固定长度方式解决粘包 问题服务端示例
 * @author: xz
 */
@Slf4j
public class NettyFixLengthServer {
    public static void main(String[] args) {
        new NettyFixLengthServer().start();
    }

    void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    .channel(NioServerSocketChannel.class)
                     //调整 netty 的接受缓冲区(byteBuf)
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
                    .group(boss, worker)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //设置定长解码器,位置必须再LoggingHandler之前,作用让所有数据包长度固定(假设长度为 16 字节)
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                //会在连接channel建立成功后,触发active事件
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    log.debug("connected>>>>>>>>>>>>>>>> {}", ctx.channel());
                                    super.channelActive(ctx);
                                }
                                @Override
                                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                    log.debug("disconnect>>>>>>>>>>>>>>>> {}", ctx.channel());
                                    super.channelInactive(ctx);
                                }
                            });
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(8080);
            log.debug("{}>>>>>>>>>>>>>>>> binding...", channelFuture.channel());
            channelFuture.sync();
            log.debug("{}>>>>>>>>>>>>>>>> bound...", channelFuture.channel());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("server error", e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
            log.debug(">>>>>>>>>>>>>>>>stoped");
        }
    }
}

客户端代码

/**
 * @description: 固定长度方式解决粘包 问题客户端示例
 * @author: xz
 */
@Slf4j
public class NettyFixLengthClient {
    public static void main(String[] args) {
        send();
    }

    //剩余位置用下划线填充
    public static byte[] fill10Bytes(char c,int len){
        byte[] bytes = new byte[16];
        Arrays.fill(bytes, (byte) '_');
        for (int i = 0; i < len; i++) {
            bytes[i] = (byte) c;
        }
        System.out.println(new String(bytes));
        return bytes;
    }

    private static void send() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(worker);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    log.debug("connetted...");
                    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            log.debug("sending...");
                            //设置ByteBuf
                            ByteBuf buffer = ctx.alloc().buffer();
                            // 发送内容随机的数据包
                            Random r = new Random();
                            char c = '0';
                            for (int i = 0; i < 10; i++) {
                                //剩余位置用下划线填充方法
                                byte[] bytes =fill10Bytes(c,r.nextInt(16)+1);
                                c++;
                                //写入到ByteBuf
                                buffer.writeBytes(bytes);
                            }
                            ctx.writeAndFlush(buffer);
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            log.error("client error", e);
        } finally {
            worker.shutdownGracefully();
        }
    }
}

3.2 分隔符消息

在数据包之间添加特定的分隔符,通过分隔符来区分不同的数据包,从而解决粘包问题。

    String recieveStr = ""$$$$这是一条粘包的消息&&&&$$$$这是一条粘包的消息&&&&"";
        if (recieveStr.contains("&&&&$$$$")) {
            log.debug("发生粘包");
            //去掉首尾的分隔符
            recieveStr = recieveStr.substring(4, recieveStr.length() - 4);
            //将分离好的消息写入字符串数组
            String[] totalBag = recieveStr.split("&&&&$$$$");
            for (String thisTotalBag : totalBag) {                
                //每条消息的业务逻辑。。。        
           
            }
        }
3.3 消息头+消息体

在消息中添加头部信息,包括消息长度等信息,接收端根据头部信息来解析数据包,从而解决粘包问题。

3.4 使用消息结束标志

在每个数据包的末尾添加特定的结束标志,接收端根据结束标志来识别数据包的边界,从而解决粘包问题。netty使用tcp/ip协议传输数据。而tcp/ip协议是类似水流一样的数据传输方式。多次访问的时候有可能出现数据粘包的问题。客户端和服务器,协商定义一个特殊的分隔符号,分隔符号长度自定义。如:‘#’、‘KaTeX parse error: Expected group after '_' at position 1: _̲’、‘AA@’。在通讯的时候,只要没有发送分隔符号,则代表一条数据没有结束。bootstrap.childHandler()方法,在该方法中,定义了一个ChannelHandler[] acceptorHandlers = new ChannelHandler[3];数组的三个元素分别对应:自定义结束符、自定义编码、自定义处理器。最后,将这个数组作为参数进行传递。

  • 服务端代码
public class Server4Delimiter {
	// 监听线程组,监听客户端请求
	private EventLoopGroup acceptorGroup = null;
	// 处理客户端相关操作线程组,负责处理与客户端的数据通讯
	private EventLoopGroup clientGroup = null;
	// 服务启动相关配置信息
	private ServerBootstrap bootstrap = null;
	public Server4Delimiter(){
		init();
	}
	private void init(){
		acceptorGroup = new NioEventLoopGroup();
		clientGroup = new NioEventLoopGroup();
		bootstrap = new ServerBootstrap();
		// 绑定线程组
		bootstrap.group(acceptorGroup, clientGroup);
		// 设定通讯模式为NIO
		bootstrap.channel(NioServerSocketChannel.class);
		// 设定缓冲区大小
		bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
		// SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
		bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
			.option(ChannelOption.SO_RCVBUF, 16*1024)
			.option(ChannelOption.SO_KEEPALIVE, true);
	}
	public ChannelFuture doAccept(int port) throws InterruptedException{
		
		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
 
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				// 数据分隔符, 定义的数据分隔符一定是一个ByteBuf类型的数据对象。
				ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
				ChannelHandler[] acceptorHandlers = new ChannelHandler[3];
				// 处理固定结束标记符号的Handler。这个Handler没有@Sharable注解修饰,
				// 必须每次初始化通道时创建一个新对象
				// 使用特殊符号分隔处理数据粘包问题,也要定义每个数据包最大长度。netty建议数据有最大长度。
				acceptorHandlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
				acceptorHandlers[1] = new StringDecoder(Charset.forName("UTF-8"));
				acceptorHandlers[2] = new Server4DelimiterHandler();
				ch.pipeline().addLast(acceptorHandlers);
			}
		});
		ChannelFuture future = bootstrap.bind(port).sync();
		return future;
	}
	public void release(){
		this.acceptorGroup.shutdownGracefully();
		this.clientGroup.shutdownGracefully();
	}
	
	public static void main(String[] args){
		ChannelFuture future = null;
		Server4Delimiter server = null;
		try{
			server = new Server4Delimiter();
			
			future = server.doAccept(9999);
			System.out.println("server started.");
			future.channel().closeFuture().sync();
		}catch(InterruptedException e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			if(null != server){
				server.release();
			}
		}
	}
	
}
 
 
public class Server4DelimiterHandler extends ChannelHandlerAdapter {
	
	// 业务处理逻辑
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String message = msg.toString();
		System.out.println("from client : " + message);
		String line = "server message $E$ test delimiter handler!! $E$ second message $E$";
		ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
	}
	
 
	// 异常处理逻辑
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("server exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}
 
}
  • 客户端代码
public class Client4Delimiter {
	
	// 处理请求和处理服务端响应的线程组
	private EventLoopGroup group = null;
	// 服务启动相关配置信息
	private Bootstrap bootstrap = null;
	
	public Client4Delimiter(){
		init();
	}
	
	private void init(){
		group = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		// 绑定线程组
		bootstrap.group(group);
		// 设定通讯模式为NIO
		bootstrap.channel(NioSocketChannel.class);
	}
	
	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				// 数据分隔符
				ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
				ChannelHandler[] handlers = new ChannelHandler[3];
				handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
				handlers[2] = new Client4DelimiterHandler();
				
				ch.pipeline().addLast(handlers);
			}
		});
		ChannelFuture future = this.bootstrap.connect(host, port).sync();
		return future;
	}
	
	public void release(){
		this.group.shutdownGracefully();
	}
	
	public static void main(String[] args) {
		Client4Delimiter client = null;
		ChannelFuture future = null;
		try{
			client = new Client4Delimiter();
			
			future = client.doRequest("localhost", 9999);
			
			Scanner s = null;
			while(true){
				s = new Scanner(System.in);
				System.out.print("enter message send to server > ");
				String line = s.nextLine();
				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
				TimeUnit.SECONDS.sleep(1);
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if(null != client){
				client.release();
			}
		}
	}
	
}
 
 
 
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
 
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try{
			String message = msg.toString();
			System.out.println("from server : " + message);
		}finally{
			// 用于释放缓存。避免内存溢出
			ReferenceCountUtil.release(msg);
		}
	}
 
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}
 
}
3.5 使用消息定长协议

利用Netty提供的LengthFieldBasedFrameDecoder等解码器,处理定长消息,从而解决粘包问题。

public class Client4Delimiter {
	
	// 处理请求和处理服务端响应的线程组
	private EventLoopGroup group = null;
	// 服务启动相关配置信息
	private Bootstrap bootstrap = null;
	
	public Client4Delimiter(){
		init();
	}
	
	private void init(){
		group = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		// 绑定线程组
		bootstrap.group(group);
		// 设定通讯模式为NIO
		bootstrap.channel(NioSocketChannel.class);
	}
	
	public ChannelFuture doRequest(String host, int port) throws InterruptedException{
		this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
 
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				// 数据分隔符
				ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
				ChannelHandler[] handlers = new ChannelHandler[3];
				handlers[0] = new DelimiterBasedFrameDecoder(1024, delimiter);
				// 字符串解码器Handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串对象
				handlers[1] = new StringDecoder(Charset.forName("UTF-8"));
				handlers[2] = new Client4DelimiterHandler();
				
				ch.pipeline().addLast(handlers);
			}
		});
		ChannelFuture future = this.bootstrap.connect(host, port).sync();
		return future;
	}
	
	public void release(){
		this.group.shutdownGracefully();
	}
	
	public static void main(String[] args) {
		Client4Delimiter client = null;
		ChannelFuture future = null;
		try{
			client = new Client4Delimiter();
			
			future = client.doRequest("localhost", 9999);
			
			Scanner s = null;
			while(true){
				s = new Scanner(System.in);
				System.out.print("enter message send to server > ");
				String line = s.nextLine();
				future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
				TimeUnit.SECONDS.sleep(1);
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			if(null != future){
				try {
					future.channel().closeFuture().sync();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if(null != client){
				client.release();
			}
		}
	}
	
}
 
 
 
public class Client4DelimiterHandler extends ChannelHandlerAdapter {
 
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try{
			String message = msg.toString();
			System.out.println("from server : " + message);
		}finally{
			// 用于释放缓存。避免内存溢出
			ReferenceCountUtil.release(msg);
		}
	}
 
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exceptionCaught method run...");
		// cause.printStackTrace();
		ctx.close();
	}
 
}
4. Netty中的粘包问题与解决方案
4.1 Netty中的粘包问题

分析Netty中可能出现的粘包问题。

4.2 Netty中的解决方案

介绍Netty提供的解决粘包问题的方案,如DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等。

5. 注意事项与总结
5.1 注意事项

在处理粘包问题时需要注意的事项。

5.2 总结

总结解决粘包问题的方法及其优缺点,以及在实践中的应用。

通过本文的学习,读者可以更深入地了解Netty中的粘包问题及其解决方案,从而更好地应对实际开发中的网络通信挑战。

相关推荐

  1. netty问题分析

    2024-03-29 19:04:03       37 阅读
  2. 怎么处理问题

    2024-03-29 19:04:03       38 阅读
  3. TCP问题优化

    2024-03-29 19:04:03       43 阅读
  4. 为什么会出现这个问题

    2024-03-29 19:04:03       35 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-29 19:04:03       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-29 19:04:03       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-29 19:04:03       87 阅读
  4. Python语言-面向对象

    2024-03-29 19:04:03       96 阅读

热门阅读

  1. docker compose 启动 redis

    2024-03-29 19:04:03       46 阅读
  2. el-tree 树形控件

    2024-03-29 19:04:03       42 阅读
  3. Elasticsearch如何处理多个关键字查询

    2024-03-29 19:04:03       41 阅读
  4. kibana和elasticsearch的关系

    2024-03-29 19:04:03       41 阅读
  5. 关于vue 的生命周期的教程

    2024-03-29 19:04:03       43 阅读
  6. 速盾:vue可以用cdn吗

    2024-03-29 19:04:03       40 阅读
  7. lvgl移植以及使用记录(1)

    2024-03-29 19:04:03       51 阅读
  8. 基于单片机的智能交通灯控制系统设计

    2024-03-29 19:04:03       43 阅读
  9. git教程

    git教程

    2024-03-29 19:04:03      43 阅读