C++基于锁的并发数据结构设计

MirrorYuChen
MirrorYuChen
发布于 2025-01-10 / 16 阅读
0
0

C++基于锁的并发数据结构设计

C++基于锁的并发数据结构设计

1 设计指南

​ 设计并发数据结构时,需要考虑两个方面:(1) 确保访问安全;(2) 真正并发访问;

​ 为保证数据结构是线程安全的,需要遵循以下几个关键点:

  • (1) 互斥保护:所有对共享数据地访问都通过互斥锁进行保护,任何修改或读取共享数据的操作都必须在加锁保护下进行。如,使用 std::mutexstd::lock_guard来自动管理互斥量的加锁和解锁;
  • (2) 避免死锁
    • 锁顺序:确保所有线程以相同顺序获取锁,避免出现 ABBA死锁问题;
    • 锁嵌套:尽量避免嵌套锁,若必须嵌套,确保嵌套顺序一致;
    • 使用std::lock和std::lock_guard多个锁版本:当需要同时获取多个锁时,可以使用 std::lock一次性获取所有锁,然后使用 std::lock_guard来管理这些锁的释放;
  • (3) 条件变量:对于需要等待特定条件的操作,使用条件变量(std::condition_variable)来避免忙等(busy-waiting),条件变量可以让线程在条件不满足时挂起,并在条件满足时被唤醒;
  • (4) 避免竞态条件:确保多线程环境下,操作顺序不会导致数据不一致,例如,避免没锁保护时对共享数据进行多步操作;
  • (5) 锁粒度
    • 细粒度锁:如果可能,使用细粒度锁来提高并发度,细粒度锁能减少锁竞争,但会增加锁管理的复杂性;
    • 粗粒度锁:对于简单数据结构或操作,使用粗粒度锁可以简化实现,并并发度低;
  • (6) 线程局部存储:对于不需要共享的数据,可以使用线程局部存储(thread_local)来避免锁的使用。

2 线程安全的栈——使用锁

/*
 * @Author: chenjingyu
 * @Date: 2025-01-06 14:34:36
 * @Contact: 2458006466@qq.com
 * @Description: Stack
 */
#include <iostream>
#include <thread>
#include <mutex>
#include <stack>
#include <exception>

struct empty_stack : std::exception {
  const char *what() const throw() {
    return "empty stack";
  }
};

template <typename T>
class Stack {
public:
  Stack() = default;
  Stack(const Stack &other) {
    std::lock_guard<std::mutex> lock(other.mtx_);
    data_ = other.data_;
  }

  Stack &operator=(const Stack &) = delete;

  void push(T value) {
    std::lock_guard<std::mutex> lock(mtx_);
    data_.push(std::move(value));
  }

  std::shared_ptr<T> pop() {
    std::lock_guard<std::mutex> lock(mtx_);
    if (data_.empty()) throw empty_stack();
    std::shared_ptr<T> const result(std::make_shared<T>(std::move(data_.top())));
    data_.pop();
    return result;
  }

  void pop(T &value) {
    std::lock_guard<std::mutex> lock(mtx_);
    if (data_.empty()) throw empty_stack();
    value = std::move(data_.top());
    data_.pop();
  }

  bool empty() const {
    std::lock_guard<std::mutex> lock(mtx_);
    return data_.empty();
  }

private:
  std::stack<T> data_;
  mutable std::mutex mtx_;
};

void producer(Stack<int> &stack, int num_elements) {
  for (int i = 0; i < num_elements; ++i) {
    stack.push(i);
    std::cout << "Producer pushed " << i << "\n";
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
}

void consumer(Stack<int> &stack, int num_elements) {
  for (int i = 0; i < num_elements; ++i) {
    try {
      int value;
      stack.pop(value);
      std::cout << "Consumer popped " << value << "\n";
      std::this_thread::sleep_for(std::chrono::seconds(1));
    } catch (const empty_stack &e) {
      std::cout << "Consumer caught exception: " << e.what() << "\n";
    }
  }
}

int main(int argc, char* argv[]) {
  constexpr const int num_elements = 10;
  Stack<int> stack;

  std::thread producer_thread(producer, std::ref(stack), num_elements);
  std::thread consumer_thread(consumer, std::ref(stack), num_elements);

  producer_thread.join();
  consumer_thread.join();

  return 0;
}

​ 编译运行:

>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> Producer pushed 0
Consumer popped 0
Producer pushed 1
Consumer popped 1
Producer pushed 2
Consumer popped 2
Producer pushed 3
Consumer popped 3
Producer pushed 4
Consumer popped 4
Producer pushed 5
Consumer popped 5
Producer pushed 6
Consumer popped 6
Producer pushed 7
Consumer popped 7
Consumer caught exception: empty stack
Consumer popped 8
Producer pushed 8
Producer pushed 9

​ 互斥量 mtx_对每个成员函数加锁保护,能保证同一时间,只有一个线程可以访问数据,所以能保证数据结构的“不变量”被破坏时,不会被其它线程看到,进而保证基本的线程安全。

​ empty()pop()成员函数间会存在潜在竞争,不过代码会在 pop()函数上锁时,显式查询栈是否为空,所以这里竞争是非恶性的。于此同时,std::stacktop()pop()亮成员函数间是有潜在竞争的,所以这里封装后的 Stack在其 pop()函数中,使用互斥量将调用 top()pop()操作绑定为“不变量”,避免了二者间的数据竞态。

3.线程安全队列——使用锁和条件变量

/*
 * @Author: chenjingyu
 * @Date: 2025-01-06 14:34:36
 * @Contact: 2458006466@qq.com
 * @Description: Queue
 */
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <memory>
#include <condition_variable>


template <typename T>
class Queue {
public:
  Queue() = default;

  void push(T value) {
    std::lock_guard<std::mutex> lock(mtx_);
    queue_.push(std::move(value));
    cv_.notify_one();
  }

  void wait_and_pop(T &value) {
    std::unique_lock<std::mutex> lock(mtx_);
    cv_.wait(lock, [this] {
      return !queue_.empty();
    });
    value = std::move(queue_.front());
    queue_.pop();
  }

  std::shared_ptr<T> wait_and_pop() {
    std::unique_lock<std::mutex> lock(mtx_);
    cv_.wait(lock, [this] {
      return !queue_.empty();
    });
    std::shared_ptr<T> result(
      std::make_shared<T>(std::move(queue_.front()))
    );
    queue_.pop();
    return result;
  }

  bool try_pop(T &value) {
    std::lock_guard<std::mutex> lock(mtx_);
    if (queue_.empty()) {
      return false;
    }
    value = std::move(queue_.front());
    queue_.pop();
    return true;
  }

  bool empty() const {
    std::lock_guard<std::mutex> lock(mtx_);
    return queue_.empty();
  }

private:
  mutable std::mutex mtx_;
  std::queue<T> queue_;
  std::condition_variable cv_;
};

void producer(Queue<int> &queue, int num_elements, int id) {
  for (int i = 0; i < num_elements; ++i) {
    int value = id * num_elements + i;
    queue.push(value);
    std::cout << "Producer " << id << " pushed " << value << "\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }
}

void consumer(Queue<int> &queue, int num_elements, int id) {
  for (int i = 0; i < num_elements; ++i) {
    std::shared_ptr<int> value = queue.wait_and_pop();
    std::cout << "Consumer " << id << " popped " << *value << "\n";
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
  }
}

int main(int argc, char* argv[]) {
  constexpr const int num_elements = 10;
  constexpr const int num_producers = 3;
  constexpr const int num_consumers = 3;
  Queue<int> queue;

  std::vector<std::thread> producers;
  for (int i = 0; i < num_producers; ++i) {
    producers.emplace_back(std::thread(producer, std::ref(queue), num_elements, i));
  }

  std::vector<std::thread> consumers;
  for (int i = 0; i < num_consumers; ++i) {
    consumers.emplace_back(std::thread(consumer, std::ref(queue), num_elements * num_producers / num_consumers, i));
  }

  for (auto &t : producers) {
    t.join();
  }

  for (auto &t : consumers) {
    t.join();
  }

  return 0;
}

​ 编译运行:

>> g++ HelloWorld.cc -o HelloWorld -lpthread -std=c++11
>> Producer Producer 1 pushed 10
0 pushed 0
Producer 2 pushed 20
Consumer 0 popped 0
Consumer 1 popped 10
Consumer 2 popped 20
Producer Producer 1 pushed 11
0 pushed 1
Producer 2 pushed 21
Consumer 1 popped 1
Consumer 0 popped 11
Consumer 2 popped 21
Producer 1 pushed 12
Producer 0 pushed 2
Producer 2 pushed 22
Consumer 1 popped 12
Consumer 0 popped 2
Consumer 2 popped 22
Producer 1 pushed 13
Producer 0 pushed 3
Producer 2 pushed 23
Consumer 1 popped 13
Consumer 0 popped 3
Consumer 2 popped 23
Producer 1 pushed 14
Producer 0 pushed 4
Producer 2 pushed 24
Consumer 1 popped 14
Consumer 0 popped 4
Consumer 2 popped 24
Producer 1 pushed 15
Producer 0 pushed 5
Producer 2 pushed 25
Consumer 1 popped 15
Consumer 0 popped 5
Consumer 2 popped 25
Producer 1 pushed 16
Producer 0 pushed 6
Producer 2 pushed 26
Consumer 1 popped 16
Consumer 0 popped 6
Consumer 2 popped 26
Producer 1 pushed 17
Producer 0 pushed 7
Producer 2 pushed 27
Consumer 1 popped 17
Consumer 2 popped 7
Consumer 0 popped 27
Producer 1 pushed 18
Producer 0 pushed 8
Producer 2 pushed 28
Consumer 1 popped 18
Consumer 0 popped 8
Consumer 2 popped 28
Producer 1 pushed 19
Producer 0 pushed 9
Producer 2 pushed 29
Consumer 1 popped 19
Consumer 2 popped 29
Consumer 0 popped 9

​ 与 Stack相比,首先,Queuepush()中调用了 cv_.notify_one(),同时,会多出 wait_and_pop()接口;然后,对于接口 try_pop(),当容器为空时,Stack会抛异常,而 Queue会返回一个 bool值。

​ 对于 try_pop()接口实现中,可以通过返回一个 bool值或一个 shared指针,通过判断返回值是否为 true或返回指针是否为 nullptr就能判断当前操作是否出现异常。所以为了正确实现函数的行为和语义,可以考虑以下三种方式:

(1) 抛出异常;
(2) 返回bool值;
(3) 返回指针;

​ wait_and_pop()是等待队列中被放入数据的一个解决方案,相较于持续调用 empty(),等待线程调用 wait_and_pop()函数要好很多。cv_.wait()会等待直到队列中有元素时,才会返回,这样就不用担心空队列地情况了,而且数据会一直被互斥锁保护。

​ 当不止一个线程等待队列 push()操作时,只会有一个线程收到由于 cv_.notify_one()通知而继续工作。但是,当这个工作线程在 wait_and_pop()中,如果构造 std::shared_ptr<T>对象时抛出异常(如,内存分配失败),会出现问题:

  • (1) push()函数中,向队列中每放入一个元素,就会调用 notify_one()唤醒一个线程来处理;
  • (2) 当异常发生时,std::unique_lock会自动释放互斥锁 mtx_,然后当前线程提前退出,不继续后续代码,包括弹出元素和返回结果,而其它线程仍然处于等待队列非空条件,无法被及时唤醒;
  • (3) 这样就会出现,队列中还有元素,而等待线程仍然处于等待状态,因为它们没有收到 cv_的唤醒;

​ 若这种情况是不可接受的,第一种改造方案是在 push()接口中,将 notify_one()改为 notify_all(),唤醒所有工作线程,不过当大多数线程发现队列仍然为空时,又会耗费很多资源让线程重新进入休眠状态。

​ 第二种改造方案是当有异常抛出时,让 wait_and_pop()函数调用 notify_one(),从而唤醒另外一个线程来处理当前任务:

std::shared_ptr<T> try_pop() {
  std::lock_guard<std::mutex> lock(mtx_);
  if (queue_.empty()) {
    return std::shared_ptr<T>();
  }
  try {
    std::shared_ptr<T> result (
      std::make_shared<T>(queue_.front())
    );
    queue_.pop();
    return result;
  } catch (const std::bad_alloc &e) {
    std::cerr << "Memory allocation failed: " << e.what() << std::endl;
    cv_.notify_one();
    return std::shared_ptr<T>();
  }  
}

​ 第三种改造方案是将 std::shared_ptr<T>初始化过程转移到 push()中,队列中存储 std::shared_ptr<T>实例,而非直接使用数值。将 std::shared_ptr<T>拷贝到 std::queue<T>中就不会抛出异常:

#include <mutex>
#include <queue>
#include <memory>
#include <condition_variable>


template <typename T>
class Queue {
public:
  Queue() = default;

  void push(T value) {
    std::lock_guard<std::mutex> lock(mtx_);
    queue_.push(std::make_shared<T>(std::move(value)));
    cv_.notify_one();
  }

  void wait_and_pop(T &value) {
    std::unique_lock<std::mutex> lock(mtx_);
    cv_.wait(lock, [this] {
      return !queue_.empty();
    });
    value = std::move(*queue_.front());
    queue_.pop();
  }

  std::shared_ptr<T> wait_and_pop() {
    std::unique_lock<std::mutex> lock(mtx_);
    cv_.wait(lock, [this] {
      return !queue_.empty();
    });
    std::shared_ptr<T> result = queue_.front();
    queue_.pop();
    return result;
  }

  bool try_pop(T &value) {
    std::lock_guard<std::mutex> lock(mtx_);
    if (queue_.empty()) {
      return false;
    }
    value = std::move(*queue_.front());
    queue_.pop();
    return true;
  }

  std::shared_ptr<T> try_pop() {
    std::lock_guard<std::mutex> lock(mtx_);
    if (queue_.empty()) {
      return std::shared_ptr<T>();
    }
    std::shared_ptr<T> result = queue_.front();
    queue_.pop();
    return result;  
  }

  bool empty() const {
    std::lock_guard<std::mutex> lock(mtx_);
    return queue_.empty();
  }

private:
  mutable std::mutex mtx_;
  std::queue<std::shared_ptr<T>> queue_;
  std::condition_variable cv_;
};

4 线程安全链表——使用细粒度锁和条件变量

4.1 单线程版本链表

​ 先看一个单线程版本的链表实现:

/*
 * @Author: chenjingyu
 * @Date: 2025-01-06 14:34:36
 * @Contact: 2458006466@qq.com
 * @Description: List
 */
#include <memory>

template <typename T>
class List {
private:
  struct Node {
    T data;
    std::unique_ptr<Node> next;
    Node(T d) : data(std::move(d)) {
    }
  };

  std::unique_ptr<Node> head_;
  Node *tail_;

public:
  List() = default;

  List(const List &) = delete;
  List &operator=(const List &) = delete;

  std::shared_ptr<T> try_pop() {
    if (!head_) {
      return std::shared_ptr<T>();
    }

    std::shared_ptr<T> const result (
      std::make_shared<T>(std::move(head_->data))
    );
    std::unique_ptr<T> const old_head = std::move(head_);
    // 读取head_的next指针
    head_ = std::move(old_head->next);
    if (!head_) {
      tail_ = nullptr;
    }  
    return result;
  }
  
  void push(T value) {
    std::unique_ptr<Node> ptr(new Node(std::move(value)));
    Node *const new_tail = ptr.get();
    if (tail_) {
      // 写入tail_的next指针
      tail_->next = std::move(ptr);
    } else {
      head_ = std::move(ptr);
    }
    tail_ = new_tail;
  }
};

​ 链表中包含一个指向链表第一个元素的头指针 head_,每个元素都会指向下一个元素。从链表中删除数据,其实就是将头指针指向下一个元素,并将之前头指针指向的元素取出即可。

​ 向链表中添加元素是从尾部进行的,为了做到这点,队列中还有一个尾指针 tail_,其指向链表的最后一个元素。新节点加入会改变尾指针的 next指针指向新接入节点,然后新加入节点会成为新的尾指针。当链表为空时,头/尾指针都是 nullptr

​ 代码中使用 std::unique_ptr<Node>来管理节点,保证节点在删除时,不需要使用 delete操作显式删除,这样的关系链表管理着从头到尾每个节点的原始指针。

​ 虽然,这种实现对于单线程来说没有什么问题,但是当你在多线程尝试使用细粒度锁时,就会出现问题。给定实现中有两个数据项(head_tail_)需要保护,但是,即使用两个互斥量来保护头指针和尾指针也会出现问题:

  • push方法可以同时修改头指针和尾指针,需要同时获取两个互斥量来保护这些操作;
  • pushtry_pop方法都可以访问 next指针,当链表中只有一个元素时,head_tail_指向同一个节点,这个节点的 next指针需要保护;
  • pushtry_pop方法不能共享同一个锁来保护 next指针的访问,因为它们的操作是独立的,需要分别获取不同的锁。

4.2 分离数据实现并发

​ 可以预分配一个虚拟节点(无数据),确保这个节点永远位于队列尾(tail_一直都指向一个虚拟节点),用来分离头尾指针能访问的节点。

/*
 * @Author: chenjingyu
 * @Date: 2025-01-06 14:34:36
 * @Contact: 2458006466@qq.com
 * @Description: List
 */
#include <memory>

template <typename T>
class List {
private:
  struct Node {
    std::shared_ptr<T> data;
    std::unique_ptr<Node> next;
  };

  std::unique_ptr<Node> head_;
  Node *tail_;

public:
  List() : head_(new Node()), tail_(head_.get()) {}

  List(const List &) = delete;
  List &operator=(const List &) = delete;

  std::shared_ptr<T> try_pop() {
    if (head_.get() == tail_) {
      return std::shared_ptr<T>();
    }

    std::shared_ptr<T> const result (head_->data);
    std::unique_ptr<T> const old_head = std::move(head_);
    head_ = std::move(old_head->next);  
    return result;
  }
  
  void push(T value) {
    std::shared_ptr<T> data(
      std::make_shared<T>(std::move(value))
    );
    std::unique_ptr<Node> ptr(new Node);
    tail_->data = data;
    Node *const new_tail = ptr.get();
    tail_->next = std::move(ptr);
    tail_ = new_tail;
  }  
};

​ 对于空链表来说, head_tail_都属于虚拟指针,而非空指针,使用这个方法后,当链表为空时,try_pop()不能访问 head_->next ,当添加新节点时(有真实节点),head_tail_现在指向不同节点,就不会在 head_->next tail_next上产生竞争,缺点就是必须添加一个间接层次的指针数据作为虚拟节点。

​ 修改后,push()只能访问 tail_,而不能访问 head_,这就是一个进步,try_pop()可以访问 head_tail_,但 tail_只需在最初进行比较,存在时间很短。这个重大的提升在于,虚拟节点意味着 try_pop()push()不能对同一个节点进行操作,这里已无需互斥了,那么只需要使用一个互斥量来保护 head_tail_即可,那么锁应该加在那里?

4.3 线程安全链表——细粒度锁版

​ 我们的目的在于最大程度并发化,所以需要上锁时间尽可能短。push()很简单,互斥量需要对 tail_访问上锁,这意味着你需要对每个新分配节点上锁,还有对当前尾节点赋值时也需上锁,锁需要持续到函数结束时才能解开。

​ try_pop()就不简单了,需要使用互斥量锁住 head_,一直 head_被弹出,但是访问 tail_时,也需要一个尾互斥量,由于只需要访问 tail_一次,所以最后将访问 tail_的过程用函数封装一下:

/*
 * @Author: chenjingyu
 * @Date: 2025-01-06 14:34:36
 * @Contact: 2458006466@qq.com
 * @Description: List
 */
#include <memory>
#include <mutex>

template <typename T>
class List {
private:
  struct Node {
    std::shared_ptr<T> data;
    std::unique_ptr<Node> next;
  };

  std::unique_ptr<Node> head_;
  Node *tail_;
  std::mutex head_mtx_;
  std::mutex tail_mtx_;

private:
  Node *get_tail() {
    std::lock_guard<std::mutex> tail_lock(tail_mtx_);
    return tail_;
  }

  std::unique_ptr<Node> pop_head() {
    std::lock_guard<std::mutex> lock_head(head_mtx_);
    if (head_.get() == get_tail()) {
      return nullptr;
    }
    std::unique_ptr<Node> const old_head = std::move(head_);
    head_ = std::move(old_head->next);
    return old_head;
  }

public:
  List() : head_(new Node()), tail_(head_.get()) {}

  List(const List &) = delete;
  List &operator=(const List &) = delete;

  std::shared_ptr<T> try_pop() {
    std::unique_ptr<T> old_head = pop_head(); 
    return old_head ? old_head : nullptr;
  }
  
  void push(T value) {
    std::shared_ptr<T> data(
      std::make_shared<T>(std::move(value))
    );
    std::unique_ptr<Node> ptr(new Node);
    Node *const new_tail = ptr.get();

    std::lock_guard<std::mutex> tail_lock(tail_mtx_);
    tail_->data = data;
    tail_->next = std::move(ptr);
    tail_ = new_tail;
  }  
};

​ 其它什么 wait_for_pop()实现,基于锁设计更复杂的数据结构这些内容后面再啃吧,感觉快吐了。

5.参考资料

  • [1] C++并发编程实战(第二版)

评论