netty使用http和webSocket

1:pom.xml配置

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.73.Final</version>
		</dependency>

2:Netty作为HTTP服务器

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class HttpServer {
   
    private final int port;

    public HttpServer(int port) {
   
        this.port = port;
    }

    public void start() throws Exception {
   
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
   
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
   
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
   
                    ChannelPipeline p = ch.pipeline();

                    // 添加 HTTP 编解码器和自定义的ChannelHandler
                    p.addLast(new HttpServerCodec());
                    p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MB
                    p.addLast(new LargeJsonHandler());
                }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
   
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
   
        new HttpServer(8080).start();
    }
}

class LargeJsonHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
   

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
   
        if (HttpUtil.is100ContinueExpected(request)) {
   
            send100Continue(ctx);
        }

        ByteBuf content = request.content();
        String jsonStr = content.toString(CharsetUtil.UTF_8);

        // 在这里对 JSON 数据进行处理
        System.out.println(jsonStr);

        // 发送响应
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.content().writeBytes("OK".getBytes(CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.writeAndFlush(response);
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
   
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
   
        // 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
   
        cause.printStackTrace();
        ctx.close();
    }
}

注意:如果发送的JSO数据如果大于1M,是会分包发送的,每次发送都会执行channelReadComplete方法,所以不可以关闭通道,发送完数据才执行channelRead0方法

3:Netty作为webSocket服务器

package com.example.slave.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

/**
 * @Description:
 * @Author: xu
 * @Data: 2024-2024/1/4-11
 * @Version: V1.0
 */
public class CustomWebSocket {
   
    private final int port;

    public CustomWebSocket(int port) {
   
        this.port = port;
    }

    public void start() throws Exception {
   
        //设置用于连接的boss组, 可在构造器中定义使用的线程数  监听端口接收客户端连接,一个端口一个线程,然后转给worker组
        //boss组用于监听客户端连接请求,有连接传入时就生成连接channel传给worker,等worker 接收请求 io多路复用,
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
   
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            //定义使用的通道 可以选择是NIO或者是OIO 代表了worker在处理socket channel时的不同情况。oio只能1对1, nio则没有1对1对关系
            //当netty要处理长连接时最好使用NIO,不然如果要保证效率 需要创建大量的线程,和io多路复用一致
                    .channel(NioServerSocketChannel.class)
                    //.channel(OioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
   
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
   
                            ChannelPipeline p = ch.pipeline();
                            // 添加 HTTP 编解码器和自定义的ChannelHandler
                            p.addLast(new HttpServerCodec());
                            p.addLast(new HttpObjectAggregator(1024 * 1024)); // 设置最大聚合大小为1MB
                            p.addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
                            p.addLast(new MyWebSocketHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口,开始接受进来的连接
            ChannelFuture f = b.bind(port).sync();

            // 等待服务器 socket 关闭
            f.channel().closeFuture().sync();
        } finally {
   
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
   
    public static ChannelGroup channelGroup;
    static {
   
        channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }
    //客户端与服务器建立连接的时候触发,
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
   
        System.out.println("与客户端建立连接,通道开启!");
        //添加到channelGroup通道组
        channelGroup.add(ctx.channel());
    }
    //客户端与服务器关闭连接的时候触发,
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
   
        System.out.println("与客户端断开连接,通道关闭!");
        channelGroup.remove(ctx.channel());
    }
	@Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
   
        // 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
   
        // 关闭发生异常的连接
        ctx.close();
    }


    //服务器接受客户端的数据信息,
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg){
   
        System.out.println("服务器收到的数据:" + msg.text());
        //sendMessage(ctx);
        sendAllMessage();
    }
    //给固定的人发消息
    private void sendMessage(ChannelHandlerContext ctx) {
   
        String message = "你好,"+ctx.channel().localAddress()+" 给固定的人发消息";
        ctx.channel().writeAndFlush(new TextWebSocketFrame(message));
    }
    //发送群消息,此时其他客户端也能收到群消息
    private void sendAllMessage(){
   
        String message = "我是服务器,这里发送的是群消息";
        channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    }
}

相关推荐

  1. netty使用httpwebSocket

    2024-01-07 16:58:01       41 阅读
  2. Nettywebsocket,如何部署Netty

    2024-01-07 16:58:01       37 阅读
  3. httpwebsocket区别

    2024-01-07 16:58:01       32 阅读
  4. Netty Websocket

    2024-01-07 16:58:01       23 阅读
  5. Springboot配置websockethttps使用 WebSocket 连接

    2024-01-07 16:58:01       34 阅读
  6. Netty HTTP

    2024-01-07 16:58:01       18 阅读
  7. netty websocket学习

    2024-01-07 16:58:01       69 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-07 16:58:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-07 16:58:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-07 16:58:01       82 阅读
  4. Python语言-面向对象

    2024-01-07 16:58:01       91 阅读

热门阅读

  1. 【C++学习笔记】C++多值返回写法

    2024-01-07 16:58:01       52 阅读
  2. 回车事件怎样绑定?

    2024-01-07 16:58:01       54 阅读
  3. Adobe Photoshop 快捷键

    2024-01-07 16:58:01       39 阅读
  4. [密码学][ecc]secp256k1

    2024-01-07 16:58:01       64 阅读
  5. Spring MVC之HandlerAdapter

    2024-01-07 16:58:01       56 阅读
  6. 神经网络中的重要概念

    2024-01-07 16:58:01       50 阅读
  7. hyperf 十九 数据库 二 模型

    2024-01-07 16:58:01       53 阅读