Netty转发数据

环境:win10、jdk17、springboot3

1.Netty概述

        Netty 是一个基于 Java 的异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端。它提供了对各种协议(如 HTTP、TCP、UDP)的支持,并通过其强大的网络编程能力,简化了网络应用的开发过程。

2.Netty使用

https://netty.io/wiki/user-guide-for-4.x.htmlicon-default.png?t=N7T8https://netty.io/wiki/user-guide-for-4.x.html

2.1 导入依赖

<!-- netty -->
<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>

        <version>4.1.84.Final</version>
</dependency>

        如果有用到redis,记得去掉redis自带的netty

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>netty-transport</artifactId>
                    <groupId>io.netty</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>netty-handler</artifactId>
                    <groupId>io.netty</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>netty-common</artifactId>
                    <groupId>io.netty</groupId>
                </exclusion>
            </exclusions>
        </dependency>

2.2 线程工具类

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author luobei
 */
public class ThreadExecutor {
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
        8,                              // 核心线程数:8个线程
        16,                             // 最大线程数:16个线程
        30,                             // 空闲线程存活时间:30秒
        TimeUnit.SECONDS,               // 空闲线程存活时间的单位:秒
        new LinkedBlockingQueue<>(10),  // 任务队列:容量为10的无界阻塞队列
        new ThreadPoolExecutor.AbortPolicy()  // 拒绝策略:抛出异常
    );

    /**
     * 使用线程池执行任务
     */
    public static void execute(Runnable runnable) {
        EXECUTOR.execute(runnable);
    }
}

2.3 Netty 服务器启动类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;

@Slf4j
@Component
public class NettyServer {

    public void start(InetSocketAddress address) {
        //配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(bossGroup, workerGroup)  // 绑定线程池
                    .channel(NioServerSocketChannel.class)
                    .localAddress(address)
                    .childHandler(new NettyServerChannelInitializer())//编码解码
                    .option(ChannelOption.SO_BACKLOG, 12800)  //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  //保持长连接,2小时无数据激活心跳机制
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(address).sync();
            log.info("netty服务器开始监听端口:" + address.getPort());
            //关闭channel和块,直到它被关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

2.4 服务端初始化类

import io.netty.channel.ChannelInitializer;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.SocketChannel;

/**
 * @Author luobei
 **/
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {

        // 设置接收缓冲区大小
        channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(2048));

        // 添加自定义的解码器
        channel.pipeline().addLast("decoder", new Decoder());
        // 添加自定义的编码器
        channel.pipeline().addLast("encoder", new Encoder());
        // 添加处理器
        channel.pipeline().addLast(new NettyServerHandler());
    }
}

2.5 解码器

import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;

 import java.util.List;

 /**
  * 解码器
  */

 public class Decoder extends ByteToMessageDecoder {

     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
         //创建字节数组,buffer.readableBytes可读字节长度
         byte[] b = new byte[buffer.readableBytes()];
         //复制内容到字节数组b
         buffer.readBytes(b);
         out.add(toHexString1(b));
     }

     private String toHexString1(byte[] b) {
         StringBuffer buffer = new StringBuffer();
         for (int i = 0; i < b.length; ++i) {
             buffer.append(toHexString1(b[i]));
         }
         return buffer.toString();
     }

     private String toHexString1(byte b) {
         String s = Integer.toHexString(b & 0xFF);
         if (s.length() == 1) {
             return "0" + s;
         } else {
             return s;
         }
     }
 }

2.6 编码器

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 编码器
 */
public class Encoder extends MessageToByteEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String s, ByteBuf byteBuf) throws Exception {
        //将16进制字符串转为数组
        byteBuf.writeBytes(CodeUtil.hexString2Bytes(s));
    }
    /**
     * @Title:hexString2Bytes
     * @Description:16进制字符串转字节数组
     * @param src 16进制字符串
     * @return 字节数组
     */
    public static byte[] hexString2Bytes(String src) {
        int l = src.length() / 2;
        byte[] ret = new byte[l];
        for (int i = 0; i < l; i++) {
            ret[i] = (byte) Integer.valueOf(src.substring(i * 2, i * 2 + 2), 16).byteValue();
        }
        return ret;
    }
}

2.7 netty消息处理类

import com.nettydemo.entity.NettyChannelDto;
import com.nettydemo.service.ProtocolFactoryService;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;

@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private static NettyServerHandler nettyServerHandler;

    @PostConstruct
    public void init() {
        nettyServerHandler = this;
    }

    /**
     * @param ctx
     * @DESCRIPTION: 有客户端连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //保存通道信息
        log.info(ctx.toString());
    }

    /**
     * @param ctx
     * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
     * @return: void
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        delChannel(ctx, 1);
    }

    /**
     * @param ctx
     * @DESCRIPTION: 有客户端发消息会触发此函数
     * @return: void
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String cmd = msg.toString();
        log.info("客户端发消息:{}", cmd);
        /**
         *  下面可以解析数据,保存数据。
         */
        ThreadExecutor.execute(() -> {
            log.info(ctx+msg.toString());
            //处理数据
        });

    }

    private NettyChannelDto getNettyChannelDto(ChannelHandlerContext ctx){

        NettyChannelDto dto = NettyContextUtil.CHANNEL_MAP.get(ctx.channel().id());
        if(dto==null){
            return updateChannel(ctx);
        }
        return dto;
    }

    private NettyChannelDto updateChannel(ChannelHandlerContext ctx){
        //获取连接通道唯一标识
        ChannelId channelId = ctx.channel().id();
        NettyChannelDto dto = NettyContextUtil.CHANNEL_MAP.get(channelId);
        if(dto==null){
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            int clientPort = insocket.getPort();

            //有新的连接,将连接信息写入缓存
            NettyChannelDto nettyChannelDto = new NettyChannelDto();
            nettyChannelDto.setContext(ctx);
            nettyChannelDto.setClientIp(clientIp);
            nettyChannelDto.setClientPort(clientPort);
            NettyContextUtil.CHANNEL_MAP.put(channelId, nettyChannelDto);
            log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
            return nettyChannelDto;
        }
        return dto;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        String socketString = ctx.channel().remoteAddress().toString();

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client: " + socketString + " READER_IDLE 读超时");
                ctx.disconnect();
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client: " + socketString + " WRITER_IDLE 写超时");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client: " + socketString + " ALL_IDLE 总超时");
                ctx.disconnect();
            }
        }
    }

    /**
     * @param ctx
     * @DESCRIPTION: 发生异常会触发此函数
     * @return: void
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("异常", cause.getMessage());
        delChannel(ctx, 2);
        ctx.close();
    }

    /**
     * 通道失去连接时调用
     * @param ctx
     * @param type 通道断开类型:1.正常断开(正常失去连接) 2.异常断开(服务端捕获到异常,断开连接)
     */
    public void delChannel(ChannelHandlerContext ctx, int type){
        switch (type){
            case 1://服务器正常断开连接

                InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();

                String clientIp = insocket.getAddress().getHostAddress();

                ChannelId channelId = ctx.channel().id();
                //有客户端断开连接,将连接信息移除
                log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
                break;
            case 2:
                //连接错误清除通道
        }
    }
}
import com.nettydemo.entity.NettyChannelDto;
import io.netty.channel.ChannelId;
import org.apache.logging.log4j.util.Strings;
import java.util.concurrent.ConcurrentHashMap;

public class NettyContextUtil {

    /**
     * 保存连接服务端的通道数量
     */
    public static final ConcurrentHashMap<ChannelId, NettyChannelDto> CHANNEL_MAP = new ConcurrentHashMap<>();
}

2.8 通道信息实体类

@Data
public class NettyChannelDto {

    //通道连接
    private ChannelHandlerContext context;
    //客户端ip
    private String clientIp;
    //端口
    private int clientPort;
    //回复命令
    private String cmd;
    //密码
    private String password;
}

2.9 netty客户端(用来转发消息)

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyClient {

    private final String host;
    private final int port;

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void sendMessage(String message) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                }
            });

            ChannelFuture f = b.connect(host, port).sync();
            log.info("已连接到服务器: {}:{}", host, port);
            f.channel().writeAndFlush(message).sync();
            log.info("消息已发送: {}", message);
            f.channel().closeFuture().sync();
        } finally {
            log.info("正在关闭工作组");
            workerGroup.shutdownGracefully();
        }
    }
}

         用的地方直接调用方法

NettyClient nettyClient = new NettyClient(HOST, PORT);
try {
    nettyClient.sendMessage(msg);
} catch (Exception e) {
    throw new RuntimeException(e);
}

 2.10 netty启动监听器

        在 Spring Boot 应用程序启动后自动启动 Netty 服务器

@Component
public class NettyStartListener implements ApplicationRunner {

    @Autowired
    private NettyServer nettyServer;

    @Value("${nettyserver.ip}")
    private String SERVER_IP;

    @Value("${nettyserver.port}")
    private Integer SERVER_PORT;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        InetSocketAddress address=new InetSocketAddress(SERVER_IP,SERVER_PORT);
        nettyServer.start(address);
    }
}

相关推荐

  1. <span style='color:red;'>Netty</span>

    Netty

    2024-07-15 18:52:03      28 阅读
  2. <span style='color:red;'>Netty</span>

    Netty

    2024-07-15 18:52:03      29 阅读
  3. Netty服务端接收TCP链接数据

    2024-07-15 18:52:03       16 阅读
  4. netty使用

    2024-07-15 18:52:03       52 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-15 18:52:03       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-15 18:52:03       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-15 18:52:03       57 阅读
  4. Python语言-面向对象

    2024-07-15 18:52:03       68 阅读

热门阅读

  1. 【ROS2】测试

    2024-07-15 18:52:03       21 阅读
  2. 「Conda」在Linux系统中安装Conda环境管理器

    2024-07-15 18:52:03       18 阅读
  3. Python学习的第一天7.15

    2024-07-15 18:52:03       24 阅读
  4. 动态内存管理(C)

    2024-07-15 18:52:03       23 阅读
  5. 算法的时间复杂度和空间复杂度-概念

    2024-07-15 18:52:03       19 阅读
  6. Matlab

    Matlab

    2024-07-15 18:52:03      20 阅读
  7. C/C++指针&智能指针二

    2024-07-15 18:52:03       15 阅读