线程同步
std::condition_variable
std::condition_variable
是 C++ 标准库 <condition_variable>
头文件中定义的一个类,用于线程同步。它提供了一种让线程等待直到特定条件被满足的方法,是实现生产者-消费者模型、线程间同步等复杂同步问题的有效工具。std::condition_variable
需要与一个互斥锁(如 std::mutex
)一起使用,以确保线程安全。
主要成员函数
1.wait: 让当前线程等待,直到被其他线程通过 notify_one
或 notify_all
唤醒。调用时需要先锁定一个与之关联的互斥锁,wait
会在等待期间解锁互斥锁,等待唤醒后再重新锁定。
void wait(std::unique_lock<std::mutex>& lock);
2.wait_until: 与 wait
类似,但是允许指定一个时间点,如果在到达该时间点前被唤醒或条件满足,则继续执行。
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
template <class Clock, class Duration, class Predicate>
bool wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<Clock, Duration>& timeout_time,
Predicate pred);
3.notify_one: 唤醒一个正在等待的线程,选择哪一个线程取决于线程调度器。
void notify_one();
4.notify_all: 唤醒所有等待的线程。
void notify_all();
使用模式
典型的使用模式包括一个条件变量、一个互斥锁和一个共享变量(条件):
- 保护共享变量:使用互斥锁确保对共享条件的访问是线程安全的。
- 等待条件:线程检查条件,如果不满足则调用
wait
函数等待。 - 通知条件改变:另一个线程修改条件后,调用
notify_one
或notify_all
唤醒等待线程。 - 重新检查条件:被唤醒的线程应该再次检查条件,因为可能是伪唤醒(spurious wakeup)。
示例代码
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool dataReady = false;
void producer() {
// 模拟数据准备
std::this_thread::sleep_for(std::chrono::seconds(2));
{
std::lock_guard<std::mutex> lock(mtx);
dataReady = true;
}
cv.notify_one(); // 数据准备好,通知消费者
}
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
while(!dataReady) { // 条件不满足时等待
cv.wait(lock);
}
std::cout << "Data received!" << std::endl;
}
int main() {
std::thread p(producer);
std::thread c(consumer);
p.join();
c.join();
return 0;
}
在这个例子中,consumer
线程等待 dataReady
变量变为 true
,由 producer
线程准备数据后设置并通知。cv.wait
会释放锁并在条件满足或被唤醒时重新获取锁,确保了线程安全。
线程安全的队列
设计一个线程安全的队列
template<typename T>
class threadsafe_queue {
mutable std::mutex m; // 互斥量,用于保护队列操作的独占访问
std::condition_variable data_cond; // 条件变量,用于在队列为空时等待
std::queue<T> data_queue; // 实际存储数据的队列
public:
threadsafe_queue() {}
void push(T new_value) {
{
std::lock_guard<std::mutex>lk(m);
std::cout << "push:" << new_value << std::endl;
data_queue.push(new_value);
}
data_cond.notify_one();
}
// 从队列中弹出元素(阻塞直到队列不为空)
void pop(T& value) {
std::unique_lock<std::mutex>lk(m);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = data_queue.front();
std::cout << "pop:" << value << std::endl;
data_queue.pop();
}
// 从队列中弹出元素(阻塞直到队列不为空),并返回一个指向弹出元素的 shared_ptr
std::shared_ptr<T> pop() {
std::unique_lock<std::mutex>lk(m);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
std::shared_ptr<T>res(std::make_shared<T>(data_queue.front()));
std::cout << "pop:" << value << std::endl;
data_queue.pop();
return res;
}
bool empty()const {
std::lock_guard<std::mutex>lk(m);
return data_queue.empty();
}
};
测试运行,我们可以写一个经典的:生产者消费者模型,也就是一个线程 push
生产,一个线程 pop
消费。
void producer(threadsafe_queue<int>& q) {
for (int i = 0; i < 5; ++i) {
q.push(i);
}
}
void consumer(threadsafe_queue<int>& q) {
for (int i = 0; i < 5; ++i) {
int value{};
q.pop(value);
}
}
运行结果:
push:0
pop:0
push:1
pop:1
push:2
push:3
push:4
pop:2
pop:3
pop:4