netty创建tcp服务端+客户端

一.创建tcp服务端

@Slf4j
@Component
public class TcpServer {
   private static Channel serverChannel;
    /**
     * 创建服务端
     */
    @Async
    public void bind(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();  //接受客户端连接并将SocketChannel注册到worker线程
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //bossGroup开启的线程数为1,workerGroup开启的线程数为CPU核心数*2
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup) //设置NIO线程组
                    .channel(NioServerSocketChannel.class) //设置channel类型为NioServerSocketChannel
                    .childHandler(new ChannelInitializer<SocketChannel>() { //初始化handler
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline cp = socketChannel.pipeline();
                            cp.addLast(new StringDecoder(StandardCharsets.UTF_8));
                            cp.addLast(new StringEncoder(StandardCharsets.UTF_8));
                            cp.addLast(new ServerHandler(port)); //添加自定义handler
                        }
                    })
                    //.option(ChannelOption.SO_BACKLOG, 128)          //处理TCP连接队列大小缓存,默认50
                    .childOption(ChannelOption.SO_KEEPALIVE, true); //心跳机制开启,保证长连接不被断开
            serverChannel = serverBootstrap.bind(port).sync().channel();//绑定端口并开始接收入站数据
            log.info("TcpServer start success...");
            serverChannel.closeFuture().await(); //阻塞当前线程,直到关闭服务器通道
        } catch (Exception e) {
            log.error("TcpServer start fall!");
            e.printStackTrace();
        } finally {
            //优雅的关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 监听类
     */
    private class ServerHandler extends SimpleChannelInboundHandler {

        private int port;// 当前 端口

        public ServerHandler(int port) {
            this.port = port;
        }


        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.error(cause.getMessage());
            cause.printStackTrace();
            //ctx.close();
        }

        /**
         * 接收消息
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
            String request = msg.toString();
            // 接收并处理来自客户端的消息
            log.info("{} ---> {}:{}", ctx.channel().remoteAddress(), this.port, request);
        }
    }


  /**
     * 发送数据
     *
     * @throws Exception
     */
    public void sendData(Object mgs) {
        try {
            serverChannel.writeAndFlush(mgs);
        } catch (Exception e) {
            log.error("发送消息错误!");
            e.printStackTrace();
        }

    }


}

下面展示一些 引用

   @Autowired
    TcpServer tcpServer;

tcpServer.bind(8080);
tcpServer.sendData(“我爱中国”);

二.创建tcp客户端,这里展示demo不是长连接了

1.下面展示一些 发送TCP给设备等待回复,35秒超时

 /**
     * 发送TCP给设备等待回复
     * 35秒超时
     *
     * @param ip
     * @param msg
     * @return
     */
    public String sendData(String ip,int port, Object msg) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        EventLoopGroup group = new NioEventLoopGroup();
        Channel channel = null;
        Response r = new Response();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            ByteBuf byteBuf = (ByteBuf) msg;
                            String response = byteBuf.toString(StandardCharsets.UTF_8);
                            log.info("response msg: " + response);
                            byteBuf.release();
                            r.setMsg(response);
                            ctx.close();
                        }

                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                            log.error("exceptionCaught ->" + cause.getMessage());
                            cause.printStackTrace();
                            // ctx.close();
                        }
                    });
            ChannelFuture future = bootstrap.connect(ip, port).sync();
            channel = future.channel();
            log.info("{}  <- {}", ip, msg.toString());
            Channel finalChannel = channel;
            Future<?> executorServiceFuture = executorService.schedule(() -> {
                // 检查Channel是否仍然是活动状态
                if (finalChannel.isActive()) {
                    finalChannel.close();
                }
            }, 35, TimeUnit.SECONDS);
            ByteBuf byteBuf = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8); // 将消息内容转换为ByteBuf
            channel.writeAndFlush(byteBuf); // 发送
            channel.closeFuture().await();//异步等待,通道关闭后会往下执行
            executorServiceFuture.cancel(true); // 立刻中断
            return r.getMsg();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        } finally {
            executorService.shutdown(); // 清理资源
            if (channel != null) channel.close();
            group.shutdownGracefully();
        }
    }

 class Response {
        private String msg;

        public Response() {
        }

        public String getMsg() {
            return msg;
        }

        public void setMsg(String msg) {
            this.msg = msg;
        }
    }

2.下面展示一些 自定义TCP客户端发送数据Socket方式


    /**
     * 自定义TCP客户端发送数据
     *
     * @param ip
     * @param port
     * @return
     */
    public void sendDataDraw(String ip, int port, Object msg) {
        try {
            Socket socket = new Socket(ip, port);
            OutputStream os = socket.getOutputStream();
            PrintStream ps = new PrintStream(os, true, "UTF-8");
            ps.println(msg);
            ps.flush();
            socket.close();
            log.info("{}  <- {}", ip, msg.toString());
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("连接不上设备!");
        }
    }

3.下面展示一些 发送tcp,不等待回复

/**
     * 发送tcp,不等待回复
     *
     * @param ip
     * @param port
     * @param msg
     */
    public void sendDataNoReply(String ip, int port, Object msg) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建客户端启动助手
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    });
            // 连接到服务器
            ChannelFuture future = bootstrap.connect(ip, port).sync();
            Channel channel = future.channel();
            ByteBuf buffer = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8);
            channel.writeAndFlush(buffer);
            log.info("{}:{}  <- {}", ip, port, msg.toString());
            // 等待一段时间以确保数据发送完成
            Thread.sleep(1000);
        } catch (Exception e) {
            log.error("发送tcp数据失败:", e);
            throw new RuntimeException("发送数据失败或连接不上");
        } finally {
            // 关闭线程组
            group.shutdownGracefully();
        }
    }

最后发送 字符串 和 16进制 方式

Object msg你要发的内容

      ByteBuf byteBuf = null;
            if (format.equals("str")) {
                byteBuf = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8); // 将消息内容转换为ByteBuf
            } else {
                byte[] bytes = hexString2Bytes(msg.toString());// 将16进制字符串转换为字节数组
                byteBuf = Unpooled.wrappedBuffer(bytes); // 使用字节数组创建ByteBuf
            }
            channel.writeAndFlush(byteBuf); // 发送


    public static byte[] hexString2Bytes(String src) {
        byte[] bytes = new byte[src.length() / 2];
        for (int i = 0; i < bytes.length; i++) {
            int index = i * 2;
            int j = Integer.parseInt(src.substring(index, index + 2), 16);
            bytes[i] = (byte) j;
        }
        return bytes;
    }

相关推荐

  1. netty创建tcp服务+客户

    2024-07-15 15:32:06       20 阅读
  2. netty udp创建服务+客户

    2024-07-15 15:32:06       24 阅读
  3. nettyTCP服务客户实现

    2024-07-15 15:32:06       44 阅读
  4. C++客户服务器TCP创建

    2024-07-15 15:32:06       51 阅读
  5. TCP服务器客户创建步骤

    2024-07-15 15:32:06       47 阅读
  6. springboot实现netty的websocket服务客户

    2024-07-15 15:32:06       55 阅读
  7. 异步TCP服务器;异步TCP客户

    2024-07-15 15:32:06       17 阅读
  8. python用websockets创建服务websocket创建客户

    2024-07-15 15:32:06       50 阅读
  9. QT TCP通讯客户服务

    2024-07-15 15:32:06       47 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-15 15:32:06       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-15 15:32:06       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-15 15:32:06       58 阅读
  4. Python语言-面向对象

    2024-07-15 15:32:06       69 阅读

热门阅读

  1. Python循环遍历:深入理解与实战应用

    2024-07-15 15:32:06       23 阅读
  2. 【Unity】制作简易计时器

    2024-07-15 15:32:06       20 阅读
  3. 文件读写的视频存在这里

    2024-07-15 15:32:06       16 阅读
  4. Spring常见问题一:IOC和DI

    2024-07-15 15:32:06       23 阅读
  5. 靖江美食元宇宙

    2024-07-15 15:32:06       21 阅读
  6. 数字IC前端设计经典书籍推荐

    2024-07-15 15:32:06       15 阅读
  7. 【TypeScript】 泛型(Generics)

    2024-07-15 15:32:06       17 阅读