C++基于锁的并发数据结构设计
1 设计指南
设计并发数据结构时,需要考虑两个方面:(1) 确保访问安全;(2) 真正并发访问;
为保证数据结构是线程安全的,需要遵循以下几个关键点:
- (1) 互斥保护:所有对共享数据地访问都通过互斥锁进行保护,任何修改或读取共享数据的操作都必须在加锁保护下进行。如,使用
std::mutex
和std::lock_guard
来自动管理互斥量的加锁和解锁; - (2) 避免死锁:
- 锁顺序:确保所有线程以相同顺序获取锁,避免出现
ABBA
死锁问题; - 锁嵌套:尽量避免嵌套锁,若必须嵌套,确保嵌套顺序一致;
- 使用std::lock和std::lock_guard多个锁版本:当需要同时获取多个锁时,可以使用
std::lock
一次性获取所有锁,然后使用std::lock_guard
来管理这些锁的释放;
- 锁顺序:确保所有线程以相同顺序获取锁,避免出现
- (3) 条件变量:对于需要等待特定条件的操作,使用条件变量(
std::condition_variable
)来避免忙等(busy-waiting
),条件变量可以让线程在条件不满足时挂起,并在条件满足时被唤醒; - (4) 避免竞态条件:确保多线程环境下,操作顺序不会导致数据不一致,例如,避免没锁保护时对共享数据进行多步操作;
- (5) 锁粒度:
- 细粒度锁:如果可能,使用细粒度锁来提高并发度,细粒度锁能减少锁竞争,但会增加锁管理的复杂性;
- 粗粒度锁:对于简单数据结构或操作,使用粗粒度锁可以简化实现,并并发度低;
- (6) 线程局部存储:对于不需要共享的数据,可以使用线程局部存储(
thread_local
)来避免锁的使用。
2 线程安全的栈——使用锁
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: Stack
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <stack>
#include <exception>
struct empty_stack : std::exception {
const char *what() const throw() {
return "empty stack";
}
};
template <typename T>
class Stack {
public:
Stack() = default;
Stack(const Stack &other) {
std::lock_guard<std::mutex> lock(other.mtx_);
data_ = other.data_;
}
Stack &operator=(const Stack &) = delete;
void push(T value) {
std::lock_guard<std::mutex> lock(mtx_);
data_.push(std::move(value));
}
std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lock(mtx_);
if (data_.empty()) throw empty_stack();
std::shared_ptr<T> const result(std::make_shared<T>(std::move(data_.top())));
data_.pop();
return result;
}
void pop(T &value) {
std::lock_guard<std::mutex> lock(mtx_);
if (data_.empty()) throw empty_stack();
value = std::move(data_.top());
data_.pop();
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx_);
return data_.empty();
}
private:
std::stack<T> data_;
mutable std::mutex mtx_;
};
void producer(Stack<int> &stack, int num_elements) {
for (int i = 0; i < num_elements; ++i) {
stack.push(i);
std::cout << "Producer pushed " << i << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void consumer(Stack<int> &stack, int num_elements) {
for (int i = 0; i < num_elements; ++i) {
try {
int value;
stack.pop(value);
std::cout << "Consumer popped " << value << "\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
} catch (const empty_stack &e) {
std::cout << "Consumer caught exception: " << e.what() << "\n";
}
}
}
int main(int argc, char* argv[]) {
constexpr const int num_elements = 10;
Stack<int> stack;
std::thread producer_thread(producer, std::ref(stack), num_elements);
std::thread consumer_thread(consumer, std::ref(stack), num_elements);
producer_thread.join();
consumer_thread.join();
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> Producer pushed 0
Consumer popped 0
Producer pushed 1
Consumer popped 1
Producer pushed 2
Consumer popped 2
Producer pushed 3
Consumer popped 3
Producer pushed 4
Consumer popped 4
Producer pushed 5
Consumer popped 5
Producer pushed 6
Consumer popped 6
Producer pushed 7
Consumer popped 7
Consumer caught exception: empty stack
Consumer popped 8
Producer pushed 8
Producer pushed 9
互斥量 mtx_
对每个成员函数加锁保护,能保证同一时间,只有一个线程可以访问数据,所以能保证数据结构的“不变量”被破坏时,不会被其它线程看到,进而保证基本的线程安全。
empty()
和 pop()
成员函数间会存在潜在竞争,不过代码会在 pop()
函数上锁时,显式查询栈是否为空,所以这里竞争是非恶性的。于此同时,std::stack
的 top()
和 pop()
亮成员函数间是有潜在竞争的,所以这里封装后的 Stack
在其 pop()
函数中,使用互斥量将调用 top()
和 pop()
操作绑定为“不变量”,避免了二者间的数据竞态。
3.线程安全队列——使用锁和条件变量
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: Queue
*/
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <memory>
#include <condition_variable>
template <typename T>
class Queue {
public:
Queue() = default;
void push(T value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(std::move(value));
cv_.notify_one();
}
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return !queue_.empty();
});
value = std::move(queue_.front());
queue_.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return !queue_.empty();
});
std::shared_ptr<T> result(
std::make_shared<T>(std::move(queue_.front()))
);
queue_.pop();
return result;
}
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = std::move(queue_.front());
queue_.pop();
return true;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
private:
mutable std::mutex mtx_;
std::queue<T> queue_;
std::condition_variable cv_;
};
void producer(Queue<int> &queue, int num_elements, int id) {
for (int i = 0; i < num_elements; ++i) {
int value = id * num_elements + i;
queue.push(value);
std::cout << "Producer " << id << " pushed " << value << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(Queue<int> &queue, int num_elements, int id) {
for (int i = 0; i < num_elements; ++i) {
std::shared_ptr<int> value = queue.wait_and_pop();
std::cout << "Consumer " << id << " popped " << *value << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main(int argc, char* argv[]) {
constexpr const int num_elements = 10;
constexpr const int num_producers = 3;
constexpr const int num_consumers = 3;
Queue<int> queue;
std::vector<std::thread> producers;
for (int i = 0; i < num_producers; ++i) {
producers.emplace_back(std::thread(producer, std::ref(queue), num_elements, i));
}
std::vector<std::thread> consumers;
for (int i = 0; i < num_consumers; ++i) {
consumers.emplace_back(std::thread(consumer, std::ref(queue), num_elements * num_producers / num_consumers, i));
}
for (auto &t : producers) {
t.join();
}
for (auto &t : consumers) {
t.join();
}
return 0;
}
编译运行:
>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> Producer Producer 1 pushed 10
0 pushed 0
Producer 2 pushed 20
Consumer 0 popped 0
Consumer 1 popped 10
Consumer 2 popped 20
Producer Producer 1 pushed 11
0 pushed 1
Producer 2 pushed 21
Consumer 1 popped 1
Consumer 0 popped 11
Consumer 2 popped 21
Producer 1 pushed 12
Producer 0 pushed 2
Producer 2 pushed 22
Consumer 1 popped 12
Consumer 0 popped 2
Consumer 2 popped 22
Producer 1 pushed 13
Producer 0 pushed 3
Producer 2 pushed 23
Consumer 1 popped 13
Consumer 0 popped 3
Consumer 2 popped 23
Producer 1 pushed 14
Producer 0 pushed 4
Producer 2 pushed 24
Consumer 1 popped 14
Consumer 0 popped 4
Consumer 2 popped 24
Producer 1 pushed 15
Producer 0 pushed 5
Producer 2 pushed 25
Consumer 1 popped 15
Consumer 0 popped 5
Consumer 2 popped 25
Producer 1 pushed 16
Producer 0 pushed 6
Producer 2 pushed 26
Consumer 1 popped 16
Consumer 0 popped 6
Consumer 2 popped 26
Producer 1 pushed 17
Producer 0 pushed 7
Producer 2 pushed 27
Consumer 1 popped 17
Consumer 2 popped 7
Consumer 0 popped 27
Producer 1 pushed 18
Producer 0 pushed 8
Producer 2 pushed 28
Consumer 1 popped 18
Consumer 0 popped 8
Consumer 2 popped 28
Producer 1 pushed 19
Producer 0 pushed 9
Producer 2 pushed 29
Consumer 1 popped 19
Consumer 2 popped 29
Consumer 0 popped 9
与 Stack
相比,首先,Queue
的 push()
中调用了 cv_.notify_one()
,同时,会多出 wait_and_pop()
接口;然后,对于接口 try_pop()
,当容器为空时,Stack
会抛异常,而 Queue
会返回一个 bool
值。
对于 try_pop()
接口实现中,可以通过返回一个 bool
值或一个 shared
指针,通过判断返回值是否为 true
或返回指针是否为 nullptr
就能判断当前操作是否出现异常。所以为了正确实现函数的行为和语义,可以考虑以下三种方式:
(1) 抛出异常;
(2) 返回bool值;
(3) 返回指针;
wait_and_pop()
是等待队列中被放入数据的一个解决方案,相较于持续调用 empty()
,等待线程调用 wait_and_pop()
函数要好很多。cv_.wait()
会等待直到队列中有元素时,才会返回,这样就不用担心空队列地情况了,而且数据会一直被互斥锁保护。
当不止一个线程等待队列 push()
操作时,只会有一个线程收到由于 cv_.notify_one()
通知而继续工作。但是,当这个工作线程在 wait_and_pop()
中,如果构造 std::shared_ptr<T>
对象时抛出异常(如,内存分配失败),会出现问题:
- (1)
push()
函数中,向队列中每放入一个元素,就会调用notify_one()
唤醒一个线程来处理; - (2) 当异常发生时,
std::unique_lock
会自动释放互斥锁mtx_
,然后当前线程提前退出,不继续后续代码,包括弹出元素和返回结果,而其它线程仍然处于等待队列非空条件,无法被及时唤醒; - (3) 这样就会出现,队列中还有元素,而等待线程仍然处于等待状态,因为它们没有收到
cv_
的唤醒;
若这种情况是不可接受的,第一种改造方案是在 push()
接口中,将 notify_one()
改为 notify_all()
,唤醒所有工作线程,不过当大多数线程发现队列仍然为空时,又会耗费很多资源让线程重新进入休眠状态。
第二种改造方案是当有异常抛出时,让 wait_and_pop()
函数调用 notify_one()
,从而唤醒另外一个线程来处理当前任务:
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return std::shared_ptr<T>();
}
try {
std::shared_ptr<T> result (
std::make_shared<T>(queue_.front())
);
queue_.pop();
return result;
} catch (const std::bad_alloc &e) {
std::cerr << "Memory allocation failed: " << e.what() << std::endl;
cv_.notify_one();
return std::shared_ptr<T>();
}
}
第三种改造方案是将 std::shared_ptr<T>
初始化过程转移到 push()
中,队列中存储 std::shared_ptr<T>
实例,而非直接使用数值。将 std::shared_ptr<T>
拷贝到 std::queue<T>
中就不会抛出异常:
#include <mutex>
#include <queue>
#include <memory>
#include <condition_variable>
template <typename T>
class Queue {
public:
Queue() = default;
void push(T value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(std::make_shared<T>(std::move(value)));
cv_.notify_one();
}
void wait_and_pop(T &value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return !queue_.empty();
});
value = std::move(*queue_.front());
queue_.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] {
return !queue_.empty();
});
std::shared_ptr<T> result = queue_.front();
queue_.pop();
return result;
}
bool try_pop(T &value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = std::move(*queue_.front());
queue_.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> result = queue_.front();
queue_.pop();
return result;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx_);
return queue_.empty();
}
private:
mutable std::mutex mtx_;
std::queue<std::shared_ptr<T>> queue_;
std::condition_variable cv_;
};
4 线程安全链表——使用细粒度锁和条件变量
4.1 单线程版本链表
先看一个单线程版本的链表实现:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: List
*/
#include <memory>
template <typename T>
class List {
private:
struct Node {
T data;
std::unique_ptr<Node> next;
Node(T d) : data(std::move(d)) {
}
};
std::unique_ptr<Node> head_;
Node *tail_;
public:
List() = default;
List(const List &) = delete;
List &operator=(const List &) = delete;
std::shared_ptr<T> try_pop() {
if (!head_) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> const result (
std::make_shared<T>(std::move(head_->data))
);
std::unique_ptr<T> const old_head = std::move(head_);
// 读取head_的next指针
head_ = std::move(old_head->next);
if (!head_) {
tail_ = nullptr;
}
return result;
}
void push(T value) {
std::unique_ptr<Node> ptr(new Node(std::move(value)));
Node *const new_tail = ptr.get();
if (tail_) {
// 写入tail_的next指针
tail_->next = std::move(ptr);
} else {
head_ = std::move(ptr);
}
tail_ = new_tail;
}
};
链表中包含一个指向链表第一个元素的头指针 head_
,每个元素都会指向下一个元素。从链表中删除数据,其实就是将头指针指向下一个元素,并将之前头指针指向的元素取出即可。
向链表中添加元素是从尾部进行的,为了做到这点,队列中还有一个尾指针 tail_
,其指向链表的最后一个元素。新节点加入会改变尾指针的 next
指针指向新接入节点,然后新加入节点会成为新的尾指针。当链表为空时,头/尾指针都是 nullptr
。
代码中使用 std::unique_ptr<Node>
来管理节点,保证节点在删除时,不需要使用 delete
操作显式删除,这样的关系链表管理着从头到尾每个节点的原始指针。
虽然,这种实现对于单线程来说没有什么问题,但是当你在多线程尝试使用细粒度锁时,就会出现问题。给定实现中有两个数据项(head_
和 tail_
)需要保护,但是,即使用两个互斥量来保护头指针和尾指针也会出现问题:
push
方法可以同时修改头指针和尾指针,需要同时获取两个互斥量来保护这些操作;push
和try_pop
方法都可以访问next
指针,当链表中只有一个元素时,head_
和tail_
指向同一个节点,这个节点的next
指针需要保护;push
和try_pop
方法不能共享同一个锁来保护next
指针的访问,因为它们的操作是独立的,需要分别获取不同的锁。
4.2 分离数据实现并发
可以预分配一个虚拟节点(无数据),确保这个节点永远位于队列尾(tail_
一直都指向一个虚拟节点),用来分离头尾指针能访问的节点。
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: List
*/
#include <memory>
template <typename T>
class List {
private:
struct Node {
std::shared_ptr<T> data;
std::unique_ptr<Node> next;
};
std::unique_ptr<Node> head_;
Node *tail_;
public:
List() : head_(new Node()), tail_(head_.get()) {}
List(const List &) = delete;
List &operator=(const List &) = delete;
std::shared_ptr<T> try_pop() {
if (head_.get() == tail_) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> const result (head_->data);
std::unique_ptr<T> const old_head = std::move(head_);
head_ = std::move(old_head->next);
return result;
}
void push(T value) {
std::shared_ptr<T> data(
std::make_shared<T>(std::move(value))
);
std::unique_ptr<Node> ptr(new Node);
tail_->data = data;
Node *const new_tail = ptr.get();
tail_->next = std::move(ptr);
tail_ = new_tail;
}
};
对于空链表来说, head_
和 tail_
都属于虚拟指针,而非空指针,使用这个方法后,当链表为空时,try_pop()
不能访问 head_->next
,当添加新节点时(有真实节点),head_
和 tail_
现在指向不同节点,就不会在 head_->next
和 tail_next
上产生竞争,缺点就是必须添加一个间接层次的指针数据作为虚拟节点。
修改后,push()
只能访问 tail_
,而不能访问 head_
,这就是一个进步,try_pop()
可以访问 head_
和 tail_
,但 tail_
只需在最初进行比较,存在时间很短。这个重大的提升在于,虚拟节点意味着 try_pop()
和 push()
不能对同一个节点进行操作,这里已无需互斥了,那么只需要使用一个互斥量来保护 head_
和 tail_
即可,那么锁应该加在那里?
4.3 线程安全链表——细粒度锁版
我们的目的在于最大程度并发化,所以需要上锁时间尽可能短。push()
很简单,互斥量需要对 tail_
访问上锁,这意味着你需要对每个新分配节点上锁,还有对当前尾节点赋值时也需上锁,锁需要持续到函数结束时才能解开。
try_pop()
就不简单了,需要使用互斥量锁住 head_
,一直 head_
被弹出,但是访问 tail_
时,也需要一个尾互斥量,由于只需要访问 tail_
一次,所以最后将访问 tail_
的过程用函数封装一下:
/*
* @Author: chenjingyu
* @Date: 2025-01-06 14:34:36
* @Contact: 2458006466@qq.com
* @Description: List
*/
#include <memory>
#include <mutex>
template <typename T>
class List {
private:
struct Node {
std::shared_ptr<T> data;
std::unique_ptr<Node> next;
};
std::unique_ptr<Node> head_;
Node *tail_;
std::mutex head_mtx_;
std::mutex tail_mtx_;
private:
Node *get_tail() {
std::lock_guard<std::mutex> tail_lock(tail_mtx_);
return tail_;
}
std::unique_ptr<Node> pop_head() {
std::lock_guard<std::mutex> lock_head(head_mtx_);
if (head_.get() == get_tail()) {
return nullptr;
}
std::unique_ptr<Node> const old_head = std::move(head_);
head_ = std::move(old_head->next);
return old_head;
}
public:
List() : head_(new Node()), tail_(head_.get()) {}
List(const List &) = delete;
List &operator=(const List &) = delete;
std::shared_ptr<T> try_pop() {
std::unique_ptr<T> old_head = pop_head();
return old_head ? old_head : nullptr;
}
void push(T value) {
std::shared_ptr<T> data(
std::make_shared<T>(std::move(value))
);
std::unique_ptr<Node> ptr(new Node);
Node *const new_tail = ptr.get();
std::lock_guard<std::mutex> tail_lock(tail_mtx_);
tail_->data = data;
tail_->next = std::move(ptr);
tail_ = new_tail;
}
};
其它什么 wait_for_pop()
实现,基于锁设计更复杂的数据结构这些内容后面再啃吧,感觉快吐了。
5.参考资料
- [1] C++并发编程实战(第二版)