用springboot的 kotlin demo,帮助理解structured concurrency简化异步并发调用的机制
准备http客户端
使用同时支持同步和异步调用的java.net.http.HttpClient
@Configuration
class RestTemplateConfig {
@Bean
fun httpClient(): HttpClient {
val client: HttpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(3))
.followRedirects(HttpClient.Redirect.NEVER) // CompletableFuture的默认回调线程池也是这个
.executor(ForkJoinPool.commonPool())
.build()
return client
}
@Bean
@Primary
fun restTemplate(httpClient: HttpClient,
builder: RestTemplateBuilder
): RestTemplate {
// 使用springboot自动初始化的builder,已加载各种RestTemplateRequestCustomizer
return builder.requestFactory(Supplier {
JdkClientHttpRequestFactory(
httpClient
)
}).build()
}
}
同步调用
@AutoConfigureMockMvc
@SpringBootTest
class ApplicationTests {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
@Autowired
private lateinit var httpClient: HttpClient
suspend fun blockingGet(tag: String) = withContext(Dispatchers.IO) {
// 在Dispatchers.IO线程池上执行
log.info("$tag 同步http调用")
var threadName1 = Thread.currentThread().name
val req1 = HttpRequest.newBuilder()
.GET().timeout(Duration.ofSeconds(10))
.uri(URI.create("https://xxx"))
.build()
var resp: HttpResponse<String> = httpClient.send(req1) {
info: ResponseInfo ->
log.info("$tag Protocol: ${
info.version()}, ${
Thread.currentThread().name}")
BodySubscribers.ofString(StandardCharsets.UTF_8)
}
log.info("$tag 等待异步http调用完成: ${
resp.body()}")
var threadName2 = Thread.currentThread().name
// 证明IO调用期间,线程被阻塞而没有释放
assertEquals(threadName1, threadName2)
log.info("$tag $threadName1 -> $threadName2")
}
}
异步调用
suspend fun asyncGet(tag: String) = withContext(Dispatchers.IO) {
// 在Dispatchers.IO线程池上执行
log.info("$tag 异步http调用")
var threadName1 = Thread.currentThread().name
val req1 = HttpRequest.newBuilder()
.GET().timeout(Duration.ofSeconds(10))
.uri(URI.create("https://xxx"))
.build()
val future1: CompletableFuture<HttpResponse<String>> =
httpClient.sendAsync(req1) {
info: ResponseInfo ->
log.info("$tag Protocol: ${
info.version()}, ${
Thread.currentThread().name}")
BodySubscribers.ofString(StandardCharsets.UTF_8)
}
var resp: HttpResponse<String> = future1.await()
// 上面调用了suspend方法,于是编译器把下面的代码包装到状态机的某个状态的回调里
var threadName2 = Thread.currentThread().name
// 观察到线程切换,证明await()这个suspend之后的代码是异步调用的
log.info("$tag $threadName1 -> $threadName2")
log.info("$tag 等待异步http调用完成: ${
resp.body()}")
}
structured concurrency用例与机制浅析
@Test
fun test1() {
log.info("main begin: " + Thread.currentThread().toString())
var dispatcher: ExecutorCoroutineDispatcher = ForkJoinPool.commonPool().asCoroutineDispatcher()
var result1 = runBlocking(dispatcher) {
// runBlocking内的代码全部在ForkJoinPool上执行,launch{}里的coroutine除外
var threadName1 = Thread.currentThread().name
repeat(3) {
launch {
blockingGet("blocking request" + it) }
}
repeat(3) {
launch {
asyncGet("async request" + it) }
}
// 调用一个suspend方法
delay(2L)
var threadName2 = Thread.currentThread().name
delay(2L)
var threadName3 = Thread.currentThread().name
// 证明runBlocking一开始就不是在进入前的main线程执行;
// 内部调用suspend前后的代码也不保证在同一个线程执行
log.info("block end: $threadName1 -> $threadName2 -> $threadName3")
"block result1"
}
// runBlocking阻塞调用它main线程到这里
log.info(result1)
log.info("main end: " + Thread.currentThread().toString())
}
demo里3个blockingGet+3个asyncGet都是并发执行的。
但是blockingGet内部使用的是阻塞式API,实际长期占用线程来等待IO结果,并没有释放线程资源来达到提升并发能力的目的。
asyncGet内部实际发生了线程的释放和异步回调;kotlin编译器在coroutine内检测到对suspend方法的调用(就是future1.await()调用),就会生成匿名类/状态机,来分割调用前后的执行逻辑;因为这个future1的完成最终是依赖IOCP/ePOll等操作系统IO接口的事件通知,所以在等待过程中不占用/阻塞任何操作系统线程;通过打印的日志能观察到future1.await()调用前后的执行线程是不同的。
runBlocking{}则默默等待内部所有launch的coroutine完成。
补充说明
阻塞式http调用,实际也是依赖操作系统的IO事件,只是没释放等待前的线程