C++并发编程实战学习笔记
1.并发的基本概念
1.1 并发与并行
- 并发:计算机在一段时间内同时执行多个相互独立的任务;
- 并行:计算机在同一时刻同时执行多个相互独立任务;
1.2 并发分类
- (1) 任务切换:单位时间内对任务进行多次切换,进而让任务看起来是并行执行的;
- (2) 硬件并行:多核多CPU计算机通过硬件支持,实现在同一时刻多个任务地并行执行;
实际使用过程中,任务数目通常远大于计算机硬件并行所能支持的最大并行数,因此正常情况下并发采用的是硬件并行结合任务切换实现的。
1.3 并发方式
- (1) 多进程并发:将应用程序分为多个独立进程同时运行;
- (2) 多线程并发:单进程中运行多个线程,每个线程相互独立运行;
1.4 为什么使用并发
- (1) 分离关注点(SOC):将相关代码和无关代码分离,使程序更容易理解和测试,进而减少出错可能;
- (2) 性能:通过任务并行(task parallelism)和数据并行(data parallelism)两种方式,将任务/数据分成多个部分并行处理,进而提高性能,降低总体运行时间;
1.5 简单入门
- (1) 编写C++代码:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello, World!" << std::endl;
}
int main(int argc, char *argv[]) {
std::thread t(hello);
t.join();
return 0;
}
- (2) 编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread
>> ./HelloWorld
Hello, World!
2.线程管理
2.1 线程启动
使用C++线程库启动线程,就是构造 std::thread
对象,这里需要包含 <thread>
头文件,std::thread
可以通过有函数操作符的实例进行构造:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello, World!" << std::endl;
}
class Task {
public:
void operator()() const {
hello();
}
};
int main(int argc, char *argv[]) {
std::thread t((Task()));
t.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread
>> ./HelloWorld
Hello, World!
当函数对象传入线程构造中时,如果你传入了一个匿名对象 std::thread t(Task());
,而不是一个具名对象时,C++编译器会将其解析成函数声明,而不是类型对象的定义,处理方法为:(1) 再加一层括号;(2) 使用花括号初始化;
// 1.加一组括号
std::thread t((Task()));
// 2.花括号初始化
std::thread t{Task()};
当子线程启动后,可以使用 join()
和 detach()
来指定主线程是否要等待子线程结束后再退出:
* (1) join
:汇入,主线程需要等待子线程结束后再退出;
* (2) detach
:分离,让子线程在后台运行,主线程可以直接退出,无需等待子线程结束;
分离线程通常称为守护线程(daemon threads),这种线程特点是长时间运行,线程生命周期可能从应用起始到结束,可能在后台监视文件系统,也可能对缓存继进行清理,亦或对数据结构进行优化。
另外,分离线程的资源在执行完毕后立即被回收,因此无法通过常规方式来获取线程结束状态或结束时间,发后即忘(fire and forget)的任务就是使用分离线程。
2.2 传递参数
向可调用对象或函数传递参数很简单,只需要将这些参数作为 std::thread
构造函数的附加参数即可,需要注意的是:同临时变量一样,这些参数会拷贝至新线程的内存空间中,即使函数参数是引用形式,拷贝操作也会执行:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <string>
#include <thread>
// 1.传入函数指针
void Print(const std::string &str) {
std::cout << str << std::endl;
}
// 2.传入一个对象
class Task {
public:
void operator()(const std::string &str) const {
Print(str);
}
};
int main(int argc, char *argv[]) {
std::thread t1(Print, "Fuction, Hello, World!");
t1.join();
std::thread t2(Task(), "Object, Hello, World!");
t2.join();
// 3.Lambda表达式
std::thread t3(
[](int a, int b) {
int c = a + b;
std::cout << "Lambda call: " << a << "+" << b << " = " << c << std::endl;
}, 1, 2
);
t3.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
Fuction, Hello, World!
Object, Hello, World!
Lambda call: 1+2 = 3
2.3 线程数及线程标识
std::thread::hardware_concurrency()
可以获取并发线程数量,std::this_thread::get_id()
可以获取类型为 std::thread::id
类型的线程标识:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
// 1.传入函数指针
void Print(const std::string &str) {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
std::cout << str << std::endl;
}
// 2.传入一个对象
class Task {
public:
void operator()(const std::string &str) const {
Print(str);
}
};
int main(int argc, char *argv[]) {
std::cout << "numThreads: " << std::thread::hardware_concurrency() << std::endl;
std::thread t1(Print, "Fuction, Hello, World!");
std::thread t2(Task(), "Object, Hello, World!");
// 3.Lambda表达式
std::thread t3(
[](int a, int b) {
int c = a + b;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
std::cout << "Lambda call: " << a << "+" << b << " = " << c << std::endl;
}, 1, 2
);
t1.join();
t2.join();
t3.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
numThreads: 8
thread id: 140447977604864
Lambda call: 1+2 = 3
thread id: 140447994390272
Fuction, Hello, World!
thread id: 140447985997568
Object, Hello, World!
这里使用 std::this_thread::sleep_for()
函数来休眠5秒,防止任务执行太快,最后三个任务都分配给了同一个线程,打印结果就是三个打印都是同一个线程 id
。
3.竞态问题
3.1 数据竞态和条件竞态
数据竞态(Data Race):指多个线程同时访问一个内存位置,并且至少有一个线程在修改这个位置的情况,数据竞态会导致程序行为不可预测,因为线程执行顺序是不确定的,从而可能导致错误的结果或程序崩溃。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
int shared_num = 0;
void increment() {
shared_num++;
}
int main(int argc, char *argv[]) {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
shared number: 2
>> ./HelloWorld
shared number: 1
条件竞态(Race Condition):指程序输出依赖于线程执行的相对顺序,而这种顺序不可预测,条件竞态通常涉及多个线程间协调和同步问题,而不仅仅是内存访问问题。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
int shared_num = 0;
void increment() {
for (int i = 0; i < 10000000; ++i) {
shared_num++;
}
}
int main(int argc, char *argv[]) {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
shared number: 10374280
3.2 避免恶性条件竞态
- (1) 对数据结构采取某种保护机制,确保只有修改线程能看到不变量的中间状态,如,互斥量
std::mutex
,可以使用C++标准库为互斥量提供的RAII
模板类std::lock_guard
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
int shared_num = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 10000000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
shared_num++;
}
}
int main(int argc, char *argv[]) {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
- (2) 对数据结构和不变量进行修改,修改完的结构必须能完成一系列不可分割的变化(原子操作),也就保证了每个变量的状态,这就是无锁编程。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <atomic>
#include <thread>
int main() {
std::atomic<int> counter(0); // 声明一个原子整数
auto increment = [&]() {
for (int i = 0; i < 100000; ++i) {
counter++; // 原子地增加计数
}
};
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Final count: " << counter << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
Final count: 200000
3.3 使用互斥量
将访问共享数据的代码标记为互斥,这样,线程访问共享数据前,将数据锁住,访问结束后,再将数据解锁。通过锁机制,就可以保证在某个时刻,只有一个线程能访问共享资源。
在C++中实例化 std::mutex
创建互斥量实例,lock()
函数对互斥量上锁,unlock()
为互斥量解锁:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
int shared_num = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 10000000; ++i) {
mtx.lock();
shared_num++;
mtx.unlock();
}
}
int main(int argc, char *argv[]) {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
shared number: 20000000
上面直接使用 std::mutex
的接口,意味着必须确保每个函数出口都调用 unlock()
函数(包括异常情况),不然就会出现死锁情况。因此,更推荐的用法是使用C++标准库提供的 RAII
模板类 std::lock_guard
,在构造时对互斥量上锁,并在析构时自动对互斥量进行解锁,从而保证互斥量被正确解锁。
std::lock_guard
实现如下:
/** @brief A simple scoped lock type.
*
* A lock_guard controls mutex ownership within a scope, releasing
* ownership in the destructor.
*/
template<typename _Mutex>
class lock_guard
{
public:
typedef _Mutex mutex_type;
explicit lock_guard(mutex_type& __m) : _M_device(__m)
{ _M_device.lock(); }
lock_guard(mutex_type& __m, adopt_lock_t) noexcept : _M_device(__m)
{ } // calling thread owns mutex
~lock_guard()
{ _M_device.unlock(); }
lock_guard(const lock_guard&) = delete;
lock_guard& operator=(const lock_guard&) = delete;
private:
mutex_type& _M_device;
};
其实挺简单的,构造实例时对互斥量上锁,在析构实例时对互斥量解锁。在C++17中添加了新特性,称为模板参数推导,类似 std::lock_guard
这样简单的模板类型,其模板参数列表可省略,调用代码可省略为:
std::lock_guard lock(mtx);
类似的接口还有 std::unique_lock
及 C++17
标准提供的 std::scoped_lock
等。
3.4 死锁问题
死锁是指两个或多个线程或进程因争夺资源而下相互等待,导致所有参与者都无法继续执行的状态。例如典型的ABBA锁顺序问题引起的死锁问题:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
int shared_num = 0;
std::mutex mtx_a, mtx_b;
void func_a() {
std::lock_guard<std::mutex> lock_a(mtx_a);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::lock_guard<std::mutex> lock_b(mtx_b);
}
void func_b() {
std::lock_guard<std::mutex> lock_b(mtx_b);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::lock_guard<std::mutex> lock_a(mtx_a);
}
int main(int argc, char *argv[]) {
std::thread ta(func_a);
std::thread tb(func_b);
ta.join();
tb.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
避免死锁的方法:
- (1) 避免嵌套锁:每个线程只持有一个锁,就不会产生死锁,当需要获取多个锁,使用
std::lock
来做这件事,避免产生死锁;
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
int shared_num = 0;
std::mutex mtx_a, mtx_b;
void func_a() {
// 同时加锁
std::lock(mtx_a, mtx_b);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
// 手动解锁
mtx_a.unlock();
mtx_b.unlock();
}
void func_b() {
// 同时加锁
std::lock(mtx_a, mtx_b);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
// 手动解锁
mtx_a.unlock();
mtx_b.unlock();
}
int main(int argc, char *argv[]) {
std::thread ta(func_a);
std::thread tb(func_b);
ta.join();
tb.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
shared number: 2
或者使用C++提供的 RAII
模板:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
int shared_num = 0;
std::mutex mtx_a, mtx_b;
void func_a() {
// 同时加锁
std::lock(mtx_a, mtx_b);
// adopt_lock:保证lock_a,lock_b生命周期结束后,会解锁
std::lock_guard<std::mutex> lock_a(mtx_a, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(mtx_b, std::adopt_lock);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
void func_b() {
// 同时加锁
std::lock(mtx_a, mtx_b);
// adopt_lock:保证lock_a,lock_b生命周期结束后,会解锁
std::lock_guard<std::mutex> lock_a(mtx_a, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(mtx_b, std::adopt_lock);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
}
int main(int argc, char *argv[]) {
std::thread ta(func_a);
std::thread tb(func_b);
ta.join();
tb.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
shared number: 2
- (2) 避免在持有锁时调用外部代码:代码是外部提供的,所以无法确定外部要做什么,在持锁情况下,若外部代码获取一个锁,就违背第一个指导意见,并造成死锁(有时无法避免)。出现错误的示例如下:
#include <iostream>
#include <thread>
#include <mutex>
#include <functional>
std::mutex mtx;
// 模拟外部代码,可能会获取相同的锁
void externalCode() {
mtx.lock();
std::cout << "External code acquired the lock" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
mtx.unlock();
}
void threadFunction() {
mtx.lock();
std::cout << "Thread acquired the lock" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
// 持有锁时调用外部代码
externalCode();
mtx.unlock();
}
int main() {
std::thread t(threadFunction);
t.join();
return 0;
}
- (3) 使用固定顺序获取锁:当硬性要求获取两个或两个以上的锁,并不能使用
std::lock
单独操作来获取它们时,最好每个线程上,用固定顺序获取它们(锁)。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
int shared_num = 0;
std::mutex mtx_a, mtx_b;
void func_a() {
std::lock_guard<std::mutex> lock_a(mtx_a);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::lock_guard<std::mutex> lock_b(mtx_b);
}
void func_b() {
std::lock_guard<std::mutex> lock_a(mtx_a);
shared_num++;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::lock_guard<std::mutex> lock_b(mtx_b);
}
int main(int argc, char *argv[]) {
std::thread ta(func_a);
std::thread tb(func_b);
ta.join();
tb.join();
std::cout << "shared number: " << shared_num << std::endl;
return 0;
}
4.同步操作
前面了解了线程间保护共享数据的方法,当然,我们不仅想要保护数据,还想对单独的线程进行同步。例如,在第一个线程完成前,等待另一个线程执行完毕。
4.1 等待事件或条件
C++标准库对条件变量有两套实现:std::condition_variable
和 std::condition_variable_any
,这两个都包含在 <condition_variable>
头文件声明中。两者都需要与互斥量一起才能工作(互斥量是为了同步),前者仅能与 std::mutex
一起工作,而后者可以和合适的互斥量一起工作,因此 std::condition_variable_any
会更通用,但性能和系统资源使用方面会有更多开销,所以通常会将 std::condition_variable
作为首选类型,当对灵活性有要求时,才会考虑 std::condition_variable_any
。
使用 std::condition_variable
处理数据等待:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue; // 用于存储数据的队列
bool finished = false; // 标记生产是否结束
// 生产者函数
void producer(int id) {
for (int i = 0; i < 5; ++i) {
std::unique_lock<std::mutex> lock(mtx);
int data = id * 10 + i; // 生成数据
data_queue.push(data);
std::cout << "i: " << i << std::endl;
std::cout << "Producer " << id << " produced: " << data << std::endl;
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
cv.notify_one(); // 通知一个等待的消费者
// 释放锁
lock.unlock();
// 当前线程休眠,其它线程抢锁
std::this_thread::sleep_for(
std::chrono::milliseconds(100)); // 模拟生产耗时
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true; // 标记生产结束
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
cv.notify_all(); // 唤醒所有等待的消费者
}
}
// 消费者函数
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 先释放传入lock对象所持有锁,在不满足条件队列非空或生产结束时,
// 将当前线程挂起,直到另一个线程唤醒,即使当前线程被唤醒,
// wait函数也会再次检查条件,满足条件时,线程才会继续执行
cv.wait(lock, [] {
return !data_queue.empty() || finished;
});
if (data_queue.empty() && finished) {
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
break; // 如果队列为空且生产结束,则退出循环
}
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << data << std::endl;
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
// 释放锁
lock.unlock();
// 当前线程休眠,其它线程抢锁
std::this_thread::sleep_for(
std::chrono::milliseconds(150)); // 模拟消费耗时
}
}
int main() {
std::thread producers[2];
std::thread consumers[2];
// 创建生产者线程
for (int i = 0; i < 2; ++i) {
producers[i] = std::thread(producer, i + 1);
}
// 创建消费者线程
for (int i = 0; i < 2; ++i) {
consumers[i] = std::thread(consumer, i + 1);
}
// 等待生产者线程结束
for (int i = 0; i < 2; ++i) {
producers[i].join();
}
// 等待消费者线程结束
for (int i = 0; i < 2; ++i) {
consumers[i].join();
}
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
i: 0
Producer 2 produced: 20
26 finished: false
Consumer 2 consumed: 20
54 finished: false
i: 0
Producer 1 produced: 10
26 finished: false
Consumer 2 consumed: 10
54 finished: false
i: 1
Producer 2 produced: 21
26 finished: false
Consumer 2 consumed: 21
54 finished: false
i: 1
Producer 1 produced: 11
26 finished: false
Consumer 1 consumed: 11
54 finished: false
i: 2
Producer 2 produced: 22
26 finished: false
i: 2
Producer 1 produced: 12
26 finished: false
Consumer 1 consumed: 22
54 finished: false
Consumer 1 consumed: 12
54 finished: false
i: 3
Producer 2 produced: 23
26 finished: false
i: 3
Producer 1 produced: 13
26 finished: false
Consumer 1 consumed: 23
54 finished: false
Consumer 1 consumed: 13
54 finished: false
i: 4
Producer 2 produced: 24
26 finished: false
Consumer 1 consumed: 24
54 finished: false
i: 4
Producer 1 produced: 14
26 finished: false
Consumer 2 consumed: 14
54 finished: false
35 finished: true
48 finished: true
48 finished: true
35 finished: true
生产者函数(producer):首先,线程会将数据放入 data_queue
中,使用 std::unique_lock
锁定互斥锁 mtx
,确保队列访问时线程安全的;然后,每次生产完数据后,调用 notify_one()
通知一个等待的消费者线程,生产者线程在生产完数据后会休眠一段时间来模拟生产耗时;最后,生产者生产完后,将 finished
标记为 true
,并调用 notify_all()
唤醒所有等待的消费者线程,以便它们可以检查队列是否为空并退出。
消费者函数(consumer):消费者线程从队列 data_queue
中取出数据并消费。首先,使用 std::unique_lock
锁定互斥锁 mtx
,确保队列访问是线程安全的;然后,使用 wait
等待队列非空或生产完成,若队列为空且生产完成,则退出循环;最后,消费者线程再消费完数据之间会休眠一段时间,模拟消费耗时。
主函数(main):首先,创建两个生产者线程和两个消费者线程;然后,等待所有生产者线程结束;最后等待所有消费者线程结束。
这里其实存在一个问题,当第一个线程执行完所有生产任务后,将 finished
置为 true
,而消费者这时恰好也消费完了第一个线程生产的内容,这时,消费者线程会退出,而第二个生产者线程产生的内容将不会被消费掉。改进方法:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: HelloWorld
*/
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <atomic>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue; // 用于存储数据的队列
bool finished = false; // 标记生产是否结束
constexpr int num_producers = 2; // 生产者线程数
constexpr int num_consumers = 2; // 消费者线程数
std::atomic<int> completed_producers{0}; // 完成生产的生产者计数
// 生产者函数
void producer(int id) {
for (int i = 0; i < 5; ++i) {
std::unique_lock<std::mutex> lock(mtx);
int data = id * 10 + i; // 生成数据
data_queue.push(data);
std::cout << "i: " << i << std::endl;
std::cout << "Producer " << id << " produced: " << data << std::endl;
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
cv.notify_one(); // 通知一个等待的消费者
lock.unlock();
// 当前线程休眠,其它线程抢锁
std::this_thread::sleep_for(
std::chrono::milliseconds(100)); // 模拟生产耗时
}
completed_producers++;
if (completed_producers == num_producers) {
std::lock_guard<std::mutex> lock(mtx);
finished = true; // 标记生产结束
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
cv.notify_all(); // 唤醒所有等待的消费者
}
}
// 消费者函数
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 先释放传入lock对象所持有锁,在不满足条件队列非空或生产结束时,
// 将当前线程挂起,直到另一个线程唤醒,即使当前线程被唤醒,
// wait函数也会再次检查条件,满足条件时,线程才会继续执行
cv.wait(lock, [] {
return !data_queue.empty() || finished;
});
if (data_queue.empty() && finished) {
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
break; // 如果队列为空且生产结束,则退出循环
}
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << data << std::endl;
std::cout << __LINE__ << " finished: " << std::boolalpha << finished << std::endl;
lock.unlock();
// std::this_thread::sleep_for(
// std::chrono::milliseconds(150)); // 模拟消费耗时
}
}
int main() {
std::thread producers[2];
std::thread consumers[2];
// 创建生产者线程
for (int i = 0; i < num_producers; ++i) {
producers[i] = std::thread(producer, i + 1);
}
// 创建消费者线程
for (int i = 0; i < num_consumers; ++i) {
consumers[i] = std::thread(consumer, i + 1);
}
// 等待生产者线程结束
for (int i = 0; i < num_producers; ++i) {
producers[i].join();
}
// 等待消费者线程结束
for (int i = 0; i < num_consumers; ++i) {
consumers[i].join();
}
return 0;
}
这里加了一个完成生产的生产者数目计数,当所有生产者都完成生产后,才将生产任务完成标志 finished
设置为 true
,这样就能保证最终消费者线程正常完成消费后再退出的逻辑。
4.2 线程安全队列
设计通用队列时,需要花时间想想,那些操作需要添加到队列实现中去:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: Queue
*/
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
template <typename T>
class Queue {
public:
Queue() = default;
Queue(const Queue &other) {
std::lock_guard<std::mutex> lock(mtx_);
queue_ = other.queue_;
}
/// @brief 向队列中添加一个元素
/// @param value 待添加元素
void push(T value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cv_.notify_one();
}
/// @brief 等待弹出一个元素,结果在引用变量中存储
/// @param value 弹出结果
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() {
return !queue_.empty();
});
value = queue_.front();
queue_.pop();
}
/// @brief 等待弹出一个元素,结果在返回值中存储
/// @return 弹出元素
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() {
return !queue_.empty();
});
std::shared_ptr<T> result(std::make_shared<T>(queue_.front()));
queue_.pop();
return result;
}
/// @brief 尝试从队列中弹出一个元素,结果在引用变量中存储
/// @param value 弹出结果
/// @return 是否成功弹出
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
/// @brief 尝试从队列中弹出一个元素,结果在返回值中存储
/// @return 弹出结果,nullptr表示弹出失败
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> result = std::make_shared<T>(queue_.front());
queue_.pop();
return result;
}
/// @brief 判空
/// @return 当前队列是否为空
bool empty() const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
private:
// empty()是const成员函数,拷贝构造函数中other形参为const引用
// 操作过程中都需要加锁,所以互斥量必须为mutable才行
mutable std::mutex mtx_;
std::queue<T> queue_;
std::condition_variable cv_;
};
// 生产者线程函数
void producer(Queue<int> &queue) {
for (int i = 0; i < 10; ++i) {
queue.push(i);
std::cout << "Produced: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
// 消费者线程函数
void consumer(Queue<int> &queue) {
for (int i = 0; i < 10; ++i) {
// std::shared_ptr<int> value = queue.wait_and_pop();
// std::cout << "Consumed: " << *value << std::endl;
int value = 0;
queue.wait_and_pop(value);
std::cout << "Consumed: " << value << std::endl;
}
}
int main(int argc, char *argv[]) {
Queue<int> queue;
std::thread producerThread(producer, std::ref(queue));
std::thread consumerThread(consumer, std::ref(queue));
producerThread.join();
consumerThread.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> Produced: Consumed: 00
Produced: 1
Consumed: 1
Produced: 2
Consumed: 2
Produced: 3
Consumed: 3
Produced: 4
Consumed: 4
Produced: 5
Consumed: 5
Produced: 6
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 9
Consumed: 9
4.3 future
C++中,std::future
是一个类模板,用于从异步操作中获取结果,它通常与 std::async
,std::packaged_task
或 std::promise
等异步编程机制一起使用。常见用法如下:
- (1) 获取异步操作结果:允许你等待一个异步操作完成,并获取其返回值。
- (2) 阻塞等待:可以使用
std::future::get
方法来阻塞当前线程,直到异步操作完成后返回。 - (3) 非阻塞式检查:通过
std::future::valid
方法检查是否关联了一个有效异步操作结果。 - (4) 异常处理:若异步操作抛出异常,
std::future::get
方法会重新抛出该异常,允许你在获取结果时进行异常处理。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: future
*/
#include <future>
#include <thread>
#include <iostream>
int compute() {
std::this_thread::sleep_for(std::chrono::seconds(2));
return 42;
}
int except() {
std::this_thread::sleep_for(std::chrono::seconds(2));
throw std::runtime_error("An error occurred in async task.");
return 666;
}
class A {
public:
A() = default;
~A() = default;
std::string Hello(const std::string &name) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return "Hello, " + name;
}
private:
std::string content_;
};
int main(int argc, char *argv[]) {
{
// 1.启动异步操作
std::future<int> result = std::async(std::launch::async, compute);
std::cout << "Computing..." << std::endl;
if (result.valid()) {
std::cout << "Future is valid." << std::endl;
// 2.等待异步操作完成,获取结果
int value = result.get();
std::cout << "Result: " << value << std::endl;
} else {
std::cout << "Future is not valid." << std::endl;
}
}
{
// 1.启动异步操作
A a;
std::future<std::string> result = std::async(std::launch::deferred, &A::Hello, std::ref(a), "MirrorYuChen");
if (result.valid()) {
std::cout << "Future is valid." << std::endl;
// 2.等待异步操作完成,获取结果
std::string value = result.get();
std::cout << "Result: " << value << std::endl;
} else {
std::cout << "Future is not valid." << std::endl;
}
}
{
// 1.启动异步线程
std::future<int> result = std::async(std::launch::async, except);
try {
if (result.valid()) {
std::cout << "Future is valid." << std::endl;
// 2.等待异步操作完成,获取结果
int value = result.get();
std::cout << "Result: " << value << std::endl;
} else {
std::cout << "Future is not valid." << std::endl;
}
} catch (const std::exception &e) {
std::cout << "Exception catched: " << e.what() << std::endl;
}
}
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
Computing...
Future is valid.
Result: 42
Future is valid.
Result: Hello, MirrorYuChen
Future is valid.
Exception catched: An error occurred in async task.
和 std::thread
一样,std::async
允许添加额外的调用参数,向函数中传递参数。对象成员函数调用时:额外参数中,第一个参数为指向成员函数指针,第二个参数提供这个函数成员类的具体实例(通过指针或包装在 std::ref
中),剩余参数作为函数的参数传递。普通函数调用时,额外参数中,第一个参数为指向函数的指针,剩余参数为函数的参数传递。
future
的等待行为取决于 std::async
启动方式。std::async
有两种异步启动任务方式:同步(in-place)执行或异步(在新线程)执行,这种启动方式由 std::launch
策略决定:
- (1)
std::launch::async
:强制异步执行,std::async
会创建一个新线程来执行任务; - (2)
std::launch::deferred
:延迟执行,任务会在第一次调用std::future::get
时同步执行,即在调用get
的线程上执行。 - (3)
std::launch::async | std::launch::defered
:默认策略,根据系统资源情况,std::async
会决定时立即异步执行还是延迟执行。
4.4 promise
std::promise
是C++11引入的一个类模板,用于线程间传递数据或异常。它通常与 std::future
一起使用,std::promise
负责设置值或异常,而 std::future
用于获取这些值或异常。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: future
*/
#include <future>
#include <thread>
#include <iostream>
#include <chrono>
void producer(std::promise<int> prom) {
std::this_thread::sleep_for(std::chrono::seconds(2));
prom.set_value(1024);
std::cout << "value set by producer.\n";
}
void consumer(std::future<int> fut) {
try {
int value = fut.get();
std::cout << "Value received by consumer: " << value << "\n";
} catch (const std::exception &e) {
std::cout << "Exception: " << e.what() << "\n";
}
}
int main(int argc, char *argv[]) {
// 1.创建promise对象,用于存储将要传递的值或异常
std::promise<int> prom;
// 2.获取与std::promise对象关联的std::future对象
std::future<int> fut = prom.get_future();
// 3.启动生产者线程,对promise对象进行设值或异常
// std::move将prom和fut所有权从主线程转移到子线程
std::thread t1(producer, std::move(prom));
// 4.启动消费者线程,获取值或异常
std::thread t2(consumer, std::move(fut));
// 5.等待线程执行完毕
t1.join();
t2.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
value set by producer.
Value received by consumer: 1024
注意事项:
- (1) 一次性使用:
std::promise
和std::future
只能用于一次性值传递,一旦调用了set_value
或set_exception
,之后就不能再次设置值或异常; - (2) 异常处理:若
std::promise
被销毁时仍未设置值,保存数据将由异常代替; - (3) 线程安全:
std::promise
和std::future
之间通信是线程安全的,但需要注意在适当时候释放资源。
4.5 代码简化
首先看一个快速排序的函数式编程(Functional Programming,FP
)串行实现:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: sort
*/
#include <iostream>
#include <algorithm>
#include <list>
template <typename T>
std::list<T> sequential_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
std::list<T> result;
// 1.将input的begin()指向元素转移给result,插入位置为result.begin()之前
result.splice(result.begin(), input, input.begin());
// 2.将取result列表第一个元素作为基准元素
T const& pivot = *result.begin();
// 3.对input分区,小于基准元素的移动到列表的前部分,大于等于基准元素的移动到列表后部分
auto divide_point = std::partition(
input.begin(), input.end(), [&](T const& t) { return t < pivot; });
// 4.获取前半部分列表
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 5.递归排序小于基准元素部分列表
auto new_lower(sequential_quick_sort(std::move(lower_part)));
// 6.递归排序大于基准元素部分列表
auto new_higher(sequential_quick_sort(std::move(input)));
// 7.合并排序结果
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower);
return result;
}
int main(int argc, char* argv[]) {
std::list<int> a = { 23, 10, 30, 55, 46, 38};
auto result = sequential_quick_sort<int>(a);
for (const auto elem : result) {
std::cout << "elem: " << elem << std::endl;
}
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -std=c++11
>> ./HelloWorld
elem: 10
elem: 23
elem: 30
elem: 38
elem: 46
elem: 55
改造成异步执行的方式:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: future
*/
#include <future>
#include <iostream>
#include <algorithm>
#include <list>
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input) {
if (input.empty()) {
return input;
}
std::list<T> result;
// 1.将input的begin()指向元素转移给result,插入位置为result.begin()之前
result.splice(result.begin(), input, input.begin());
// 2.将取result列表第一个元素作为基准元素
T const& pivot = *result.begin();
// 3.对input分区,小于基准元素的移动到列表的前部分,大于等于基准元素的移动到列表后部分
auto divide_point = std::partition(
input.begin(), input.end(), [&](T const& t) { return t < pivot; });
// 4.获取前半部分列表
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 5.递归排序小于基准元素部分列表:异步执行方式
std::future<std::list<T>> new_lower(
std::async(¶llel_quick_sort<T>, std::move(lower_part))
);
// 6.递归排序大于基准元素部分列表:异步执行方式
std::future<std::list<T>> new_higher(
std::async(¶llel_quick_sort<T>, std::move(input))
);
// 7.合并排序结果
result.splice(result.end(), new_higher.get());
result.splice(result.begin(), new_lower.get());
return result;
}
int main(int argc, char* argv[]) {
std::list<int> a = { 23, 10, 30, 55, 46, 38, 100, 200, 320, 540};
auto result = parallel_quick_sort<int>(a);
for (const auto elem : result) {
std::cout << "elem: " << elem << std::endl;
}
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
elem: 10
elem: 23
elem: 30
elem: 38
elem: 46
elem: 55
elem: 100
elem: 200
elem: 320
elem: 540
5.内存模型与原子操作
5.1 内存模型与原子操作
C++11引入了一个正式的内存模型,用于定义程序中不同线程间内存操作如何互相影响。内存模型的主要目标是提供一种机制,使得程序员可以理解和预测多线程程序的行为,特别是在涉及共享数据时。
- 内存顺序(Memory Order):C++内存模型定义了不同内存顺序约束,用于控制原子操作顺序和可见性。常见内存顺序包括:
memory_order_relaxed
:最弱的顺序约束,不保证操作之间顺序;memory_order_release
:用于“发布”共享数据,可以确保其它线程读取该原子变量时,能看到写入的所有数据;memory_order_acquire
:用于“消费”共享数据,当一个线程读取某个原子变量后,使用memory_order_aquire
可以确保它能看到之前其它线程对该原子变量进行memory_order_release
操作前的所有写操作;memory_order_seq_cst
:最强顺序约束,提供严格的顺序保证;
原子操作指在多线程环境中不可分割的操作,确保任何时刻只有一个线程可以执行该操作。C++提供了 std::atomic
类模板来支持原子操作。
- 原子操作函数
store
:原子地存储一个值;load
:原子地加载一个值;exchange
:原子地交换两个值;compare_exchange_weak
和compare_exchage_strong
:原子地比较并交换值;
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: future
*/
#include <iostream>
#include <atomic>
#include <thread>
std::atomic<bool> ready {false};
std::atomic<int> data {0};
void producer() {
data.store(1024, std::memory_order_release);
ready.store(true, std::memory_order_release);
}
void consumer() {
while (!ready.load(std::memory_order_acquire));
int result = data.load(std::memory_order_acquire);
std::cout << "answer is: " << result << std::endl;
}
int main(int argc, char* argv[]) {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
answer is: 1024
5.2 标准原子类型
std::atomic_flag
是最简单地原子类型,这个类型对象可以在两个状态间切换:设置和清除。std::atomic_flag
类型对象必须被 ATOMIC_FLAG_INIT
初始化,初始化标志位是“清除”状态,这里没有选择,这个标志总是初始化为“清除”:
std::atomic_flag flag = ATOMIC_FLAG_INIT;
当标志对象已初始化,只能做三件事:销毁、清除或设置(查询之前的值)。这些操作对应的函数分别为:clear()
和 test_and_set()
。clear()
是一个存储操作,所以不能用 memory_order_acquire
或 memory_order_qcq_rel
语义,但 test_and_set()
是一个"读-改-写"操作,可以用于任何内存顺序。每个原子操作,默认内存序都是 memory_order_seq_cst
,例如:
flag.clear(std::memory_order_release);
bool x = flag.test_and_set();
std::atomic_flag
非常适合用作自旋锁,初始化标志是"清除",并且互斥量处于解锁状态。为了锁上互斥量,循环运行 test_and_set()
直到旧值为 false
,这意味着线程已经被设置成了 true
。解锁互斥量是一件很简单的事,将标志清除即可。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: spinlock
*/
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
class SpinLock {
public:
/// @brief 加锁
void lock() {
// 若flag_已被其它线程设置(即,已上锁),会进入一个忙等待循环(自旋),直至flag_被清除(解锁)
// memory_order_acquire确保加锁后,当前线程能看到其它线程对共享数据所有写操作
while (flag_.test_and_set(std::memory_order_acquire));
}
/// @brief 解锁
void unlock() {
// 清除flag_,表示锁已释放
// memory_order_release确保解锁前,当前线程对共享数据所有写操作对其它线程可见
flag_.clear(std::memory_order_release);
}
/// @brief 尝试加锁,若未加锁就上锁
/// @return 是否加锁成功
bool try_lock() {
return !flag_.test_and_set();
}
private:
std::atomic_flag flag_ {ATOMIC_FLAG_INIT};
};
SpinLock spin_lock;
int shared_data = 0;
void increment() {
spin_lock.lock();
shared_data++;
std::cout << "Incremented: " << shared_data << std::endl;
spin_lock.unlock();
}
void try_increment() {
spin_lock.lock();
if (spin_lock.try_lock()) {
shared_data++;
std::cout << "Incremented: " << shared_data << std::endl;
spin_lock.unlock();
} else {
std::cout << "Try increment failed.\n";
}
}
int main(int argc, char* argv[]) {
std::vector<std::thread> threads;
for (int i = 0; i < 10; ++i) {
threads.emplace_back(increment);
}
threads.emplace_back(try_increment);
for (auto &thread : threads) {
thread.join();
}
std::cout << "Final shared data: " << shared_data << std::endl;
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> ./HelloWorld
Incremented: 1
Incremented: 2
Incremented: 3
Incremented: 4
Incremented: 5
Incremented: 6
Incremented: 7
Incremented: 8
Incremented: 9
Incremented: 10
Try increment failed.
Final shared data: 10
6.参考资料
- [1] C++并发编程实战(第二版)
- [2] C++锁:概念、不同锁实现、死锁现象