SSE(Server Sent Event)实战(2)- Spring MVC 实现

一、服务端实现

  1. 使用 @RestController 注解创建一个控制器类(Controller)

  2. 创建一个方法来创建一个客户端连接,它返回一个 SseEmitter,处理 GET 请求并产生(produces)文本/事件流 (text/event-stream)

  3. 创建一个新的 SseEmitter, 保存它并从方法中返回

  4. 在另一个线程中异步发送事件, 先拿到保存的 SseEmitter 并根据需要多次调用调用SseEmitter.send()方法

  5. 完成事件发送, 调用 SseEmitter.complete() 方法

  6. 要异常完成发送事件,请调用 SseEmitter.completeWithError() 方法

/*
 * xxx.com
 * Copyright (C) 2021-2024 All Rights Reserved.
 */
package com.sse.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author xxx
 * @version SseController.java, v 0.1 2024-07-11 10:11
 */
@Slf4j
@RestController
@RequestMapping("/sse")
public class SseController {

    private static final Map<String, SseEmitter> SSE_EMITTER_MAP = new ConcurrentHashMap<>();

    /**
     * 创建连接
     */
    @GetMapping("/create-connect")
    public SseEmitter createConnect(@RequestParam("userId") String userId) {

        try {
            // 设置超时时间,0表示不过期。默认30秒
            SseEmitter sseEmitter = new SseEmitter(0L);

            // 注册回调
            sseEmitter.onCompletion(() -> removeSseConnection(userId, "SSE连接已关闭"));
            sseEmitter.onError(throwable -> removeSseConnection(userId, "SSE连接出现错误"));
            sseEmitter.onTimeout(() -> removeSseConnection(userId, "SSE连接超时"));

            SSE_EMITTER_MAP.put(userId, sseEmitter);

            log.info("创建了用户[{}]的SSE连接", userId);
            return sseEmitter;
        } catch (Exception e) {
            log.error("创建新的SSE连接异常,当前用户:" + userId, e);
            return null;
        }
    }

    /**
     * 发送消息
     */
    @GetMapping("/send-message")
    public void sendMessage(@RequestParam("userId") String userId, @RequestParam("message") String message) {

        SseEmitter sseEmitter = SSE_EMITTER_MAP.get(userId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event()
                        .name("message")
                        .data(message)
                        .reconnectTime(5000));
                log.info("给用户[{}]发送消息成功: {}", userId, message);
            } catch (Exception e) {
                log.error("给用户[{}]发送消息失败: {}", userId, e.getMessage(), e);
                // 如果发送失败,尝试从map中移除失效的SseEmitter
                removeSseConnection(userId, "发送消息失败");
            }
        } else {
            log.info("用户[{}]的SSE连接不存在或已关闭,无法发送消息", userId);
        }
    }

    private void removeSseConnection(String userId, String reason) {
        SSE_EMITTER_MAP.computeIfPresent(userId, (key, sseEmitter) -> {
            sseEmitter.complete();
            log.info("用户[{}]的SSE连接已移除,原因:{}", userId, reason);
            return null;
        });
    }
} 

二、客户端实现

创建多个 index.html文件,放在 static 目录下,用不同的浏览器打开,实现向多个用户推送的场景。
在这里插入图片描述


<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Demo</title>
    <script>        document.addEventListener('DOMContentLoaded', function () {
        var userId = "1";

        // 创建一个新的EventSource对象
        var source = new EventSource('http://localhost:8080/sse/create-connect?userId=' + userId);

        // 当连接打开时触发
        source.onopen = function (event) {
            console.log('SSE连接已打开');
        };

        // 当从服务器接收到消息时触发
        source.onmessage = function (event) {
            // event.data 包含服务器发送的文本数据
            console.log('接收到消息:', event.data);
            // 在页面上显示消息
            var messagesDiv = document.getElementById('messages');
            if (messagesDiv) {
                messagesDiv.innerHTML += '<p>' + event.data + '</p>'; // 直接使用event.data
            } else {
                console.error('未找到消息容器元素');
            }
        };

        // 当发生错误时触发
        source.onerror = function (event) {
            console.error('SSE连接错误:', event);
        };
    });
    </script>
</head>
<body>
<div id="messages">
    <!-- 这里将显示接收到的消息 -->
</div>
</body>
</html>

三、启动项目

  1. 运行 Spring 项目
    在这里插入图片描述
  2. 浏览器打开 index.html文件
    在这里插入图片描述
  3. 调用发送消息接口
    curl http://localhost:8080/sse/send-message\?userId\=1\&message\=test0001
    在这里插入图片描述

打开多个连接,用 userId 就可以实现向不同的用户推送的逻辑了。

四、总结

上面已经实现了最基本的消息推送需求,但是我们还可以思考一下实际生产中,我们还需要做哪些优化?

  1. 如果我们服务设置了最大连接时间,比如 3 分钟,而服务端又长时间没有消息推送给客户端,导致长连接被关闭该怎么办?
  2. 实际生产环境,我们肯定是多个实例部署,那么怎么保证创建连接和发送消息是在同一个实例完成?如果不是一个实例,就意味着用户没有建立连接,消息肯定发送失败。

下一篇博客,再做具体优化。

相关推荐

  1. springMVC实现细节

    2024-07-18 13:12:01       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-18 13:12:01       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-18 13:12:01       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-18 13:12:01       58 阅读
  4. Python语言-面向对象

    2024-07-18 13:12:01       69 阅读

热门阅读

  1. css3 中的伪类和伪元素

    2024-07-18 13:12:01       20 阅读
  2. 微信小程序学习之旅

    2024-07-18 13:12:01       21 阅读
  3. Docker安装ELK(简易版)

    2024-07-18 13:12:01       21 阅读
  4. Solana介绍

    2024-07-18 13:12:01       20 阅读
  5. 数学建模-Topsis(优劣解距离法)

    2024-07-18 13:12:01       21 阅读
  6. Ubuntu2204搭建ceph17

    2024-07-18 13:12:01       18 阅读
  7. npm安装依赖包的多种镜像及方法

    2024-07-18 13:12:01       21 阅读
  8. flutter高德地图release闪退

    2024-07-18 13:12:01       19 阅读
  9. 理解Go 语言中读写锁 RWMutex

    2024-07-18 13:12:01       18 阅读
  10. Vim(Vi IMproved)

    2024-07-18 13:12:01       20 阅读
  11. 新员工入职通识考试

    2024-07-18 13:12:01       20 阅读