原理
为什么引入协程?
基于回调嵌套的异步代码难以维护。
C++20 为什么选择无栈协程?
有栈协程的介绍
有栈(stackful)协程通常的实现手段是在堆上提前分配一块较大的内存空间(比如 64K),也就是协程所谓的“栈”,参数、return address 等都可以存放在这个“栈”空间上。如果需要协程切换,那么通过 swapcontext 一类的形式来让系统认为这个堆上空间就是普通的栈,这就实现了上下文的切换。
有栈协程最大的优势就是侵入性小,使用起来非常简便,已有的业务代码几乎不需要做什么修改。
栈空间的限制
有栈协程的“栈”空间普遍是比较小的,在使用中有栈溢出的风险;而如果让“栈”空间变得很大,对内存空间又是很大的浪费。无栈协程则没有这些限制,既没有溢出的风险,也无需担心内存利用率的问题。
性能
有栈协程在切换时确实比系统线程要轻量,但是和无栈协程相比仍然是偏重的,C++20 coroutines 提案的作者 Gor Nishanov 在 CppCon 2018 上演示了无栈协程能做到纳秒级的切换。
无栈协程是普通函数的泛化
无栈协程是一个可以暂停和恢复的函数,是函数调用的泛化。一个函数的函数体(function body)是顺序执行的,执行完之后将结果返回给调用者,我们没办法挂起它并稍后恢复它,只能等待它结束。而无栈协程则允许我们把函数挂起,然后在任意需要的时刻去恢复并执行函数体,相比普通函数,协程的函数体可以挂起并在任意时刻恢复执行。
整体流程
Return_t foo () {
auto res = co_await awaiter;
co_return res ;
}
这个流程的驱动是由编译器根据协程函数生成的代码驱动的,它最终生成的是一百多行的代码, 无论是协程的创建还是 co_await 机制都是由这些代码实现的。
协程的创建
- 创建一个协程帧(coroutine frame),协程帧的内容:协程参数、局部变量、promise对象;
- 在协程帧里构建 promise 对象;
- 把协程的参数拷贝到协程帧里;
- 调用 promise.get_return_object() 返回给 caller 一个对象,即代码中的 Return_t 对象;
- 通过 promise 的 initial_suspend 和 final_suspend 返回类型来控制协程是否挂起(如果挂起协程,代码的控制权就会返回到caller,否则继续执行协程函数体(function body)),在 unhandled_exception 里处理异常,在 return_value 里保存协程返回值。
co_await 机制
co_await 操作符是 C++20 新增的一个关键字,co_await expr 一般表示等待一个惰性求值的任务,这个任务可能在某个线程执行,也可能在 OS 内核执行,什么时候执行结束不知道,为了性能,我们又不希望阻塞等待这个任务完成,所以就借助 co_await 把协程挂起并返回到 caller,caller 可以继续做事情,当任务完成之后协程恢复并拿到 co_await 返回的结果。
所以 co_await 一般有这几个作用:
- 挂起协程;
- 返回到 caller;
- 等待某个任务(可能是 lazy 的,也可能是非 lazy 的)完成之后返回任务的结果。
执行协程到函数的 co_await awaiter 时,是否需要等待某个任务?返回 false 表明希望等待,于是接着进入到 awaiter.wait_suspend(),并挂起协程,在 await_suspend 中创建了一个线程去执行任务(注意协程具柄传入到线程中了,以便后面在线程中恢复协程),之后就返回到 caller了,caller 这时候可以不用阻塞等待线程结束,可以做其它事情。注意:这里的 awaiter 同时也是一个 awaitable,因为它支持 co_await。
更多时候我们在线程完成之后才去恢复协程,这样可以告诉挂起等待任务完成的协程:任务已经完成了,现在可以恢复了,协程恢复后拿到任务的结果继续执行。
task test() {
std::cout << std::this_thread::get_id() << "\n";
co_await awaiter{};
std::cout << std::this_thread::get_id() << "\n";
}
输出结果显示 co_await 上面和下面的线程是不同的,以 co_await 为分界线,co_await 之上的代码在一个线程中执行,co_await 之下的代码在另外一个线程中执行,一个协程函数跨了两个线程,这就是协程的“魔法”。本质是因为在另外一个线程中恢复了协程,恢复后代码的执行就在另外一个线程中了。
协程恢复
当线程开始运行的时候恢复挂起的协程,这时候代码执行会回到协程函数继续执行,这就是最终的目标:在一个新线程中去执行协程函数的打印语句。
协程销毁
awaiter.final_suspend 决定是否要自动销毁协程,返回 std::suspend_never 就自动销毁协程,否则需要用户手动去销毁。
简单示例
#include <coroutine>
#include <iostream>
#include <thread>
namespace Coroutine {
struct task {
struct promise_type {
promise_type() {
std::cout << "1.create promie object\n";
}
task get_return_object() {
std::cout << "2.create coroutine return object, and the coroutine is created now\n";
return {std::coroutine_handle<task::promise_type>::from_promise(*this)};
}
std::suspend_never initial_suspend() {
std::cout << "3.do you want to susupend the current coroutine?\n";
std::cout << "4.don't suspend because return std::suspend_never, so continue to execute coroutine body\n";
return {};
}
std::suspend_never final_suspend() noexcept {
std::cout << "13.coroutine body finished, do you want to susupend the current coroutine?\n";
std::cout << "14.don't suspend because return std::suspend_never, and the continue will be automatically destroyed, bye\n";
return {};
}
void return_void() {
std::cout << "12.coroutine don't return value, so return_void is called\n";
}
void unhandled_exception() {}
};
std::coroutine_handle<task::promise_type> handle_;
};
struct awaiter {
bool await_ready() {
std::cout << "6.do you want to suspend current coroutine?\n";
std::cout << "7.yes, suspend becase awaiter.await_ready() return false\n";
return false;
}
void await_suspend(
std::coroutine_handle<task::promise_type> handle) {
std::cout << "8.execute awaiter.await_suspend()\n";
std::thread([handle]() mutable { handle(); }).detach();
std::cout << "9.a new thread lauched, and will return back to caller\n";
}
void await_resume() {}
};
task test() {
std::cout << "5.begin to execute coroutine body, the thread id=" << std::this_thread::get_id() << "\n";//#1
co_await awaiter{};
std::cout << "11.coroutine resumed, continue execcute coroutine body now, the thread id=" << std::this_thread::get_id() << "\n";//#3
}
}// namespace Coroutine
int main() {
Coroutine::test();
std::cout << "10.come back to caller becuase of co_await awaiter\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
- 任务(task)类型:包含
promise_type
,是协程的核心,用于管理协程的生命周期和状态。 - Awaiter:自定义的
Awaiter
实现了await_ready
,await_suspend
,await_resume
,用于控制协程的挂起和恢复。 - 协程函数
test
:使用co_await
关键字等待awaiter
的完成。 - 主函数:运行协程函数并等待它结束。
运行结果
运行过程
- 创建协程对象:在
main
函数中调用Coroutine::test()
时,创建了一个协程并执行。int main() { Coroutine::test(); ```
- 协程初始化:执行
promise_type
构造函数,打印 "1.create promise object",然后返回协程对象。promise_type() { std::cout << "1.create promise object\n"; } ```
- 返回协程对象:调用
get_return_object
,打印 "2.create coroutine return object, and the coroutine is created now"。task get_return_object() { std::cout << "2.create coroutine return object, and the coroutine is created now\n"; return {std::coroutine_handle<task::promise_type>::from_promise(*this)}; ```
- 初始挂起检查:调用
initial_suspend
,决定是否第一次挂起协程。这里由于返回std::suspend_never
,不会挂起,继续执行协程体。std::suspend_never initial_suspend() { std::cout << "3.do you want to suspend the current coroutine?\n"; std::cout << "4.don't suspend because return std::suspend_never, so continue to execute coroutine body\n"; return {}; } ```
- 执行协程体,等待 Awaiter:协程体开始执行,打印 "5.begin to execute coroutine body, the thread id=". 然后遇到
co_await awaiter{}
。std::cout << "5.begin to execute coroutine body, the thread id=" << std::this_thread::get_id() << "\n"; co_await awaiter{}; ```
- await_ready:
awaiter.await_ready()
决定是否立即挂起。因为返回false
,所以打印 "6.do you want to suspend current coroutine?" 和 "7.yes, suspend because awaiter.await_ready() return false"。然后挂起并执行await_suspend
。bool await_ready() { std::cout << "6.do you want to suspend current coroutine?\n"; std::cout << "7.yes, suspend because awaiter.await_ready() return false\n"; return false; } ```
- await_suspend:
await_suspend
创建新的线程,并在新线程中恢复协程执行。打印 "8.execute awaiter.await_suspend()" 和 "9.a new thread launched, and will return back to caller"。void await_suspend(std::coroutine_handle<task::promise_type> handle) { std::cout << "8.execute awaiter.await_suspend()\n"; std::thread([handle]() mutable { handle(); }).detach(); std::cout << "9.a new thread launched, and will return back to caller\n"; } ```
- 恢复到调用者线程:输出 "10.come back to the caller because of co_await awaiter",意味着主线程恢复执行。在
main
函数中马上休眠一秒。int main() { Coroutine::test(); std::cout << "10.come back to caller because of co_await awaiter\n"; std::this_thread::sleep_for(std::chrono::seconds(1)); } ```
- 在新线程中恢复执行:新线程中,协程恢复执行,打印 "11.coroutine resumed, continue execute coroutine body now, the thread id="。协程体继续执行到结束,调用
return_void
打印 "12.coroutine don't return value, so return_void is called"。std::cout << "11.coroutine resumed, continue execute coroutine body now, the thread id=" << std::this_thread::get_id() << "\n"; ```
- 最终挂起检查:调用
final_suspend
确定是否在协程完成后挂起。由于返回std::suspend_never
,不会挂起并立即清理。std::suspend_never final_suspend() noexcept { std::cout << "13.coroutine body finished, do you want to suspend the current coroutine?\n"; std::cout << "14.don't suspend because return std::suspend_never, and the coroutine will be automatically destroyed, bye\n"; return {}; } ```
参考
异步框架
使用流程
获取异步操作句柄
使用ExecutorService::of 获取执行句柄。
#include "pie/coroutine.hpp"
// 省略namespace pie
// 获取当前业务的异步操作句柄
auto businessHandler = ExecutorService::of("businessName");
businessHandler->post([]() {
printf(""); // post到有序队列中,单线程执行
});
businessHandler->toMain([]() {
printf(""); // post到主线程中
});
businessHandler->toGlobal([]() {
printf(""); // post到全局队列中,多线程无序执行
});
// 可以传入延时参数
businessHandler->post([]{
// 执行任务
}, 500); // 延迟500毫秒
异步操作(协程or普通函数)
拿到异步操作句柄后,使用post操作异步任务,同时支持异步化普通函数or协程 。
Task<void> myTask() {
printf("协程执行\n");
}
int main() {
auto handler = ExecutorService::of("你的业务名称");
// 普通函数
handler->post([]() {
printf("lambda异步\n");
});
// 协程函数
handler->post(myTask());
}
定义协程
// 创建一个空协程
Task<void> testVoid() {
co_return;
}
// 返回一个string的协程
Task<std::string> testString() {
co_return "xxxx";
}
等待协程
使用co_await等待某个协程执行完毕。
Task<void> testVoid() {
co_return;
}
Task<void> testAwait() {
co_await testVoid(); // 等待testVoid执行完毕
co_return;
}
协程调度
执行器
通过ExecutorService获取Handler,通过Handler可以向执行器传递任务(协程),在同一个Handler中,传递的任务都是单线程并发执行的。
Task<void> test() {
co_return;
}
Task<void> test2() {
co_return;
}
auto handler = ExecutorService::of("你的业务名称");
handler->post(test1);
handler->post(test2);
Looper
Looper * looper = Looper::myLooper(); //获取当前线程的looper
Looper * looper = Looper::mainLooper(); //获取主线程的looper
bool isMainThread = Looper::isMainThread(); // 判断是否主线程
调度器
如果不额外指示调度器,所有的Task都将在一个线程中执行,如果某个任务你想在具体的线程中执行,可以使用调度器。
协程调度设计
参考了go语言的G-M-P模型
- G - Goroutine,Go协程,是参与调度与执行的最小单位,每个 Goroutine 拥有自己的栈、程序计数器和其他状态。
- M - Machine,指的是系统级线程,M 是实际执行 Goroutine 的实体,在底层直接映射到操作系统线程。一个 M 可以绑定一个 P(Processor),然后由 P 调度多个 Goroutine 来运行。
- P - Processor,指的是逻辑处理器,P关联了的本地可运行G的队列(也称为LRQ)。
调度逻辑为:
- 线程M想运行任务就需得获取 P,即与P关联。
- 然从 P 的本地队列(LRQ)获取 G
- 若LRQ中没有可运行的G,M 会尝试从全局队列(GRQ)拿一批G放到P的本地队列,
- 若全局队列也未找到可运行的G时候,M会随机从其他 P 的本地队列偷一半放到自己 P 的本地队列。
- 拿到可运行的G之后,M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
对于单线程并发的任务(大多数情况)我们会在提供协程服务的同时尽可能的保证线程安全。
对于无序并发的任务,需要自行保证线程安全。
示例
单线程并发&顺序执行
使用Handler,即使在单线程下也能完成并发操作。
long long getCurrentTime() {
milliseconds ms = duration_cast<milliseconds>(
system_clock::now().time_since_epoch()
);
return ms.count();
}
Task<void> testTask(int id) {
printf("task start id=%d, time=%lld\n", id, getCurrentTime());
co_await coDelay(100);
printf("task end id=%d, time=%lld\n", id, getCurrentTime());
co_return;
}
auto handler = ExecutorService::of("sijian");
handler->post(testTask(1));
handler->post(testTask(2));
handler->post(testTask(3));
无序并发任务
当做全局线程池来使用:
// 使用lambda创建普通任务
ExecutorService::dispatch([]() {
printf("异步任务");
});
// 使用task创建协程任务
Task<void> taskSample() {
co_return;
}
ExecutorService::dispatch(taskSample());
将任意的异步回调封装成协程调用
假设你有一个函数,是用回调封装的
void functionWithCallback(std::function<void(int)> callback) {
std::thread([=]() {
callback(100);
}).detach();
}
我们可以将其封装成协程,而且无需担心线程安全问题
Task<int> testTask() {
Promise<int> promise;
printf("你的线程\n");
functionWithCallback([&](int data) {
printf("异步线程\n");
Promise.resove(data);
});
int ret = co_await promise;
printf("你的线程\n");
printf("%d\n", ret);
co_return ret;
}
通用异步iO(将任意的IO封装成协程)
IOTask<int> iotask() {
// io线程执行,耗时操作
co_return 0;
}
Task<int> testTask() {
Promise<int> promise;
printf("你的线程\n");
int ret = co_await iotask();
printf("你的线程\n");
printf("%d\n", ret);
co_return ret;
}
对比std::async
特性 | std::async |
协程(coroutines) |
---|---|---|
引入版本 | C++11 | C++20 |
工作机制 | 使用std::future 处理异步任务。 |
使用co_await , co_yield 和 co_return 关键字处理异步任务和任务协作。 |
返回类型 | std::future<T> |
自定义的 std::future 或其他返回类型 |
同步/异步控制 | 默认异步执行,但可以通过参数控制。 | 完全异步,由编译器和运行时处理。 |
优点 | 使用简单,易于理解,能直接提供异步处理能力。 | 更灵活、高效,不需要线程,能等待更细粒度的任务。 |
灵活性 | 灵活性有限,因为是基于线程的简单模型。 | 高度灵活,可以暂停和恢复执行,易于实现状态机模型等复杂操作。 |
性能 | 由于线程开销,大量任务时可能性能受限。 | 更高效,因为避免了线程开销,轻量级上下文切换。 |
资源管理 | 自动管理,隐藏了底层线程操作。 | 主动控制,能细粒度管理资源和任务。 |
错误处理 | 异常传播到 std::future::get() 捕获。 |
使用 co_await , co_return 可以直接处理异常。 |
可读性 | 类似传统函数调用,较高。 | 学习曲线陡峭,但对复杂任务流控制更直观。 |