Webflux中的订阅关系

最近因为要提供流式接口,一直被webflux折磨,经常遇到各种问题,比如这段代码为什么没有运行,为什么是这个执行顺序,现在发现很多都跟不清楚订阅关系有关系。比如下例,我问了通义灵码这样一个问题:

Mono<PromptResult> result1 = WebClientUtil.getWebClient().post() .uri("http://" + SD_COMFYUI_URL + "/prompt") .body(Mono.just(requestBody), String.class) // 设置请求体 
.retrieve() .bodyToMono(String.class) .map(s -> { 
	PromptResult promptResult = JSON.parseObject(s, PromptResult.class, JSONReader.Feature.SupportSmartMatch); //支持下划线转驼峰 
	wsClient.setPromptId(promptResult.getPromptId()); 
	log.info("flux txtToImg 0 反序列化:{}", JSON.toJSONString(promptResult)); 
	return promptResult; 
});
Mono<SDHistoryResult.Image> result2 = result1.flatMap(p -> //flatMap to get promptId in result1 
WebClientUtil.getWebClient().get() .uri("http://" + SD_COMFYUI_URL + "/history/" + p.getPromptId()) .retrieve() .bodyToMono(String.class) .map(jsonString -> { Gson gson = new Gson(); Type type = new TypeToken<Map<String, SDHistoryResult>>() { }.getType(); log.info("flux txtToImg result2 {} || {}", "http://" + SD_COMFYUI_URL + "/history/" + p.getPromptId(), jsonString); Map<String, SDHistoryResult> map = gson.fromJson(jsonString, type); SDHistoryResult.Image image = map.get(p.getPromptId()).getOutputs().values().stream().findFirst().get().getImages().get(0); log.info("flux txtToImg result2 image {}", image); return image; }) .retryWhen(Retry.fixedDelay(5, Duration.ofMillis(1000))) );

Flux<String> flux = Flux.concat(result1.map(PromptResult::getPromptId), result2.map(s -> "result2: " + s)); 
return flux;

webflux中的这段代码会使result1运行两次,为什么

背景:代码在webflux 某个Controller方法,使用Flux.concat的目的是为了顺序执行,而在定义result2时使用result1.flatMap,是因为result2需要取result1的执行结果。

通义的回答:

在你提供的代码片段中,result1确实会被运行两次,这是因为Flux.concat操作在内部实际上会分别订阅并执行每一个Publisher(在这里是result1和result2)。当Flux.concat执行时,它首先订阅并完全消耗第一个Publisher(这里是result1.map(PromptResult::getPromptId)),然后才开始订阅并执行第二个Publisher(即result2.map(s -> "result2: " + s))。
问题在于result1在result2中通过flatMap被引用,这导致了result1的副作用(如网络请求)被执行了一次。然而,在Flux.concat中再次引用result1时,它将被独立地执行第二次,即使其内部的网络请求已经在result2的flatMap中执行过一次了。
为了避免这种情况,你可以使用share或者publish操作符来共享Mono的执行结果。具体来说,publish会创建一个可订阅的ConnectableFlux,而share则是在publish的基础上自动连接和断开以管理资源。使用share可以确保无论result1被多少次订阅,其内部的网络请求只执行一次。
修改后的代码如下:

Mono<PromptResult> result1 = ... // 你的原始代码

// 使用share()确保result1只执行一次
Mono<PromptResult> sharedResult1 = result1.share();

Mono<SDHistoryResult.Image> result2 = 
    sharedResult1.flatMap(p -> 
        // 其余代码与你的原始代码相同
    );

Flux<String> flux = Flux.concat(
    sharedResult1.map(PromptResult::getPromptId),
    result2.map(s -> "result2: " + s)
);
return flux;

通过这种方式,无论sharedResult1被订阅多少次,其内部的网络请求只会执行一次。

解决

大意是 Flux.concat导致result1被订阅了一次,result2 = result1.flatMap又导致result1被订阅了一次。而通义对share方法的解释是:

该函数是Reactive Streams的一种操作符,用于将一个Mono对象转换为一个热流(Hot Stream),即被多个订阅者订阅时,每个订阅者都会接收到相同的数据。

我用.share()方法试了下,解决了我的问题。

相关推荐

  1. Webflux订阅关系

    2024-07-17 01:44:04       20 阅读
  2. WebFlux使用WebSocket拓展功能分析

    2024-07-17 01:44:04       56 阅读
  3. 【软考】UML关联关系

    2024-07-17 01:44:04       23 阅读
  4. 关于@KafkaListener动态订阅消费topic调研

    2024-07-17 01:44:04       54 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-17 01:44:04       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-17 01:44:04       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-17 01:44:04       58 阅读
  4. Python语言-面向对象

    2024-07-17 01:44:04       69 阅读

热门阅读

  1. 在 Git 中把文件恢复到之前的提交

    2024-07-17 01:44:04       21 阅读
  2. 可用内存为什么可以超过实际内存

    2024-07-17 01:44:04       20 阅读
  3. 安全运营概述

    2024-07-17 01:44:04       22 阅读