使用structured concurrency简化异步并发调用


用springboot的 kotlin demo,帮助理解structured concurrency简化异步并发调用的机制

准备http客户端

使用同时支持同步和异步调用的java.net.http.HttpClient

@Configuration
class RestTemplateConfig {
   
    @Bean
    fun httpClient(): HttpClient {
   
        val client: HttpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(3))
            .followRedirects(HttpClient.Redirect.NEVER) // CompletableFuture的默认回调线程池也是这个
            .executor(ForkJoinPool.commonPool())
            .build()
        return client
    }

    @Bean
    @Primary
    fun restTemplate(httpClient: HttpClient,
        builder: RestTemplateBuilder
    ): RestTemplate {
   
        // 使用springboot自动初始化的builder,已加载各种RestTemplateRequestCustomizer
        return builder.requestFactory(Supplier {
   
            JdkClientHttpRequestFactory(
                httpClient
            )
        }).build()
    }
}       

同步调用

@AutoConfigureMockMvc
@SpringBootTest
class ApplicationTests {
   
    private val log: Logger = LoggerFactory.getLogger(this::class.java)
    @Autowired
    private lateinit var httpClient: HttpClient
    suspend fun blockingGet(tag: String) = withContext(Dispatchers.IO) {
   
        // 在Dispatchers.IO线程池上执行
        log.info("$tag 同步http调用")
        var threadName1 = Thread.currentThread().name
        val req1 = HttpRequest.newBuilder()
            .GET().timeout(Duration.ofSeconds(10))
            .uri(URI.create("https://xxx"))
            .build()

        var resp: HttpResponse<String> = httpClient.send(req1) {
    info: ResponseInfo ->
            log.info("$tag Protocol: ${
     info.version()}, ${
     Thread.currentThread().name}")
            BodySubscribers.ofString(StandardCharsets.UTF_8)
        }
        log.info("$tag 等待异步http调用完成: ${
     resp.body()}")
        var threadName2 = Thread.currentThread().name
        // 证明IO调用期间,线程被阻塞而没有释放
        assertEquals(threadName1, threadName2)
        log.info("$tag $threadName1 -> $threadName2")
    }
}

异步调用

    suspend fun asyncGet(tag: String) = withContext(Dispatchers.IO) {
   
        // 在Dispatchers.IO线程池上执行
        log.info("$tag 异步http调用")
        var threadName1 = Thread.currentThread().name
        val req1 = HttpRequest.newBuilder()
            .GET().timeout(Duration.ofSeconds(10))
            .uri(URI.create("https://xxx"))
            .build()

        val future1: CompletableFuture<HttpResponse<String>> =
            httpClient.sendAsync(req1) {
    info: ResponseInfo ->
                log.info("$tag Protocol: ${
     info.version()}, ${
     Thread.currentThread().name}")
                BodySubscribers.ofString(StandardCharsets.UTF_8)
            }

        var resp: HttpResponse<String> = future1.await()
        // 上面调用了suspend方法,于是编译器把下面的代码包装到状态机的某个状态的回调里
        var threadName2 = Thread.currentThread().name
        // 观察到线程切换,证明await()这个suspend之后的代码是异步调用的
        log.info("$tag $threadName1 -> $threadName2")
        log.info("$tag 等待异步http调用完成: ${
     resp.body()}")
    }

structured concurrency用例与机制浅析

    @Test
    fun test1() {
   
        log.info("main begin: " + Thread.currentThread().toString())
        var dispatcher: ExecutorCoroutineDispatcher = ForkJoinPool.commonPool().asCoroutineDispatcher()
        var result1 = runBlocking(dispatcher) {
   
            // runBlocking内的代码全部在ForkJoinPool上执行,launch{}里的coroutine除外
            var threadName1 = Thread.currentThread().name
            repeat(3) {
   
                launch {
    blockingGet("blocking request" + it) }
            }
            repeat(3) {
   
                launch {
    asyncGet("async request" + it) }
            }
            // 调用一个suspend方法
            delay(2L)
            var threadName2 = Thread.currentThread().name
            delay(2L)
            var threadName3 = Thread.currentThread().name
            // 证明runBlocking一开始就不是在进入前的main线程执行;
            // 内部调用suspend前后的代码也不保证在同一个线程执行
            log.info("block end: $threadName1 -> $threadName2 -> $threadName3")
            "block result1"
        }
        // runBlocking阻塞调用它main线程到这里
        log.info(result1)
        log.info("main end: " + Thread.currentThread().toString())
    }

demo里3个blockingGet+3个asyncGet都是并发执行的。
但是blockingGet内部使用的是阻塞式API,实际长期占用线程来等待IO结果,并没有释放线程资源来达到提升并发能力的目的。
asyncGet内部实际发生了线程的释放和异步回调;kotlin编译器在coroutine内检测到对suspend方法的调用(就是future1.await()调用),就会生成匿名类/状态机,来分割调用前后的执行逻辑;因为这个future1的完成最终是依赖IOCP/ePOll等操作系统IO接口的事件通知,所以在等待过程中不占用/阻塞任何操作系统线程;通过打印的日志能观察到future1.await()调用前后的执行线程是不同的。

runBlocking{}则默默等待内部所有launch的coroutine完成。

补充说明

阻塞式http调用,实际也是依赖操作系统的IO事件,只是没释放等待前的线程

相关推荐

  1. 使用structured concurrency简化异步并发调用

    2024-01-08 20:32:02       52 阅读
  2. 异步并发怎么做?

    2024-01-08 20:32:02       12 阅读
  3. Nodejs-异步并发控制

    2024-01-08 20:32:02       9 阅读
  4. 使用Feign简化Spring Boot微服务间的调用

    2024-01-08 20:32:02       41 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-08 20:32:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-08 20:32:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-08 20:32:02       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-08 20:32:02       20 阅读

热门阅读

  1. Wargames与bash知识10

    2024-01-08 20:32:02       32 阅读
  2. kotlin take 和 drop

    2024-01-08 20:32:02       37 阅读
  3. Copilot在Pycharm的应用和示例

    2024-01-08 20:32:02       32 阅读
  4. python常见解包方式

    2024-01-08 20:32:02       32 阅读
  5. 参数校验注解使用- validator

    2024-01-08 20:32:02       46 阅读
  6. 预训练模型的分类,以及代表模型介绍

    2024-01-08 20:32:02       36 阅读
  7. 如何使用 CMake 来构建一个包含子目录的 C++ 项目

    2024-01-08 20:32:02       36 阅读
  8. C++多态

    C++多态

    2024-01-08 20:32:02      26 阅读
  9. Docker 容器数据卷

    2024-01-08 20:32:02       39 阅读