线程间的共享数据的问题全部归咎于修改数据,而读取数据则没有问题。
比如,在线程 A 中,对不变量 (invariants) 的更改会影响线程 B 对它状态的读取,以至于线程 B 无法准确地获得它的状态,从而使得基于这个状态而产生的对这个不变量的后续更改出现问题。
举例来说,当线程 A 正在删除链表中的某个元素(学过数据结构都明白,这一般不是一步操作就能完成的),线程 B 在某个时刻尝试读取这个链表,那么在此时,可能线程 B 只能看到元素的某个部分被删除(即删除过程还没完成),对于线程 B 来说,这个不变量就被破坏了,如果在线程 B 中尝试访问(比如遍历)甚至是修改这个链表的元素,那么很有可能会出现一些意外的问题,轻则操作失败,重则数据结构损坏或者程序崩溃。这种问题通常是随机出现且不可预测的。
说明:不变量指是的站在某个线程的角度,去认定(或者说默许)这个量在线程执行某项操作的整个过程中是不被改变的量。
从另一个角度来看,线程之间在同一个数据上各自执行某项操作时,是一种「竞争关系」,它们执行的顺序存在先后之分,但是总是在操作相同的数据。
竞争条件 (race condition) 即是执行的结果取决于多个线程上操作的执行顺序。在 C++ 标准中定义了「数据竞争」这个术语来描述一种特殊的竞争条件,即由于并发地去修改一个单一对象,数据竞争会导致可怕的未定义行为 (undefined behavior)。
对有问题的竞争条件,以下是几种有效的编程方法可以避免。
有锁编程:对其他的线程屏蔽修改的中间过程,只有进行修改的线程能看到不变量的中间状态。也就是从其他线程来看,要么修改还没开始,要么已告完成。
无锁编程:让修改由一系列不可分割的(或者说原子的)变化来完成,每一个变化都维持不变量,以至于最终它们共同维持了不变量,即无锁编程。
事务编程:将修改数据的操作当做事务 (transaction) 来处理,就像在一个事务中更新数据库一样,所需的一系列操作都存储在一个事务日志中,只有提交之后,修改才会真正应用到数据上。如果数据被另一个线程修改了而无法修改,那么事务就会重启,这又叫做软件事务内存 (STM, Software Transaction Memory) 。
如果不想让程序陷入竞争条件,我们可以将所有访问数据结构的代码都标记为互斥的 (mutally exclusive) ,这样做就可以在一个线程执行这段代码的时候,其他线程若想进入就必须等待,直到前面的线程将这段代码执行完毕。从其他线程的角度来看,不变量并没有被破坏,也就是说要么数据还没开始修改,要么修改已经完成。最常用的互斥方式就是采用互斥锁,就像试衣间一样。当线程进入代码时,如果互斥锁已上锁,那么就会等待它解锁;否则就上锁并执行这个代码,运行完这段代码再解锁,以便其他线程能够上锁。
C++ 标准库提供了互斥锁的基本功能,通过构建一个 std::mutex
实例来创建一个互斥锁,并且它提供了一个成员函数 lock()
对互斥锁上锁,unlock()
对互斥锁解锁。在代码实践中一般不推荐直接使用这些成员函数手动完成上锁/解锁操作,因为你需要成对地使用这些函数,并且需要手动地控制,以避免一些复杂的情形,比如发生异常。
进一步地,C++ 标准库提供了基于 RAII 机制的 std::lock_guard<>
类模板,在构造时对指定的锁进行上锁,在析构的时候将其解锁,这样就保证了一个已经锁住的互斥锁能够被正确地解锁。
std::mutex
与std::lock_guard<>
都在mutex
头文件中被包含。
在存在竞争条件的情况下,对链表进行保护
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> lst;
std::mutex mtx;
void add_to_list(int value) {
std::lock_guard<std::mutex> guard(mtx); // 指定锁
lst.push_back(value);
}
bool list_contains(int value) { // 在链表中,查找一个值是否存在
std::lock_guard<std::mutex> guard(mtx); // 指定锁
return std::find(lst.begin(), lst.end(), value) != lst.end(); // 通过迭代器调用 std::find 算法,查找元素
}
在 C++ 17 标准中,语言支持了类模板参数自动推导,所以 std::lock_guard 可以完全省略尖括号和其中的模板参数。
C++ 17 提供了更强的保护机制,即
std::scoped_lock
类模板,与std::lock_guard
用法大致相同,在上述代码中可完全替换。
为了使得上述的互斥锁是稳定的且有效的,就应该考虑额外的情况,否则互斥锁对共享数据的保护可能将形同虚设。比如:
迷失 (stray) 指针/引用
即可以直接通过函数、甚至是赋值过程获得被保护数据的指针或者引用,使得它暴露在保护之外,代码示例如下。
struct data_type {
int value;
};
class data_wrapper {
private:
data_type data;
std::mutex m;
public:
void do_something() {
std::lock_guard<std::mutex> guard(m);
/* 临界区的其他代码 */
}
data_type * get_stray_ptr() {
return &data; // 返回被保护数据的指针
}
data_type & get_stray_ref() {
return data; // 返回被保护数据的引用
}
};
在回调函数中传递被保护数据的指针/引用
这种情况与前者类似,但是更加隐晦,当我们在编写类似代码时,须再三考察。
struct data_type {
int value;
};
class data_wrapper {
private:
data_type data;
std::mutex m;
public:
template<typename F> void process_callback(F func) {
std::lock_guard<std::mutex> guard(m);
func(data);
}
};
data_type *unprotected;
void malicious_func(data_type & data) {
unprotected = &data;
}
data_wrapper w;
void foo() {
w.process(malicious_func); // 传递恶意函数,这会使得被保护数据被套出
// 至此 unprotected 已经获得被保护的数据
}
即便我们设计了一个类似 std::stack
类,对 top()
和 pop()
两个成员的内部都使用互斥锁保护了,但在代码实践过程中的某些原因,在操作共享数据的时候依然会存在潜在的竞争条件。
比如,当两个线程 A 和 B,都将在一个共享的 std::stack
对象上运行 top()
判定栈是否为空,如果非空就进行 pop()
,这时就会产生竞争条件。具体地说,假设栈中只存在一个元素,线程 A 可能刚完成 top()
确定了栈非空,即将要运行 pop()
弹出栈顶元素,但这时由于并发执行顺序的问题,线程 B 此时运行了 top()
也确定了栈为空,然后线程 A 接着执行 pop()
,最后,线程 B 在不知道线程 A 已经将唯一元素弹出(或者说此时它还认为栈是非空的)继续执行 pop()
尝试弹出元素,就会引发错误。
1、扩大临界区:将 top()
与 pop()
组合起来一起加锁。
2、传入引用以获取结果:整合两者功能并改造弹出函数为 pop(T & result)
,不过对于自定义的元素类型,需要支持拷贝构造,甚至是移动构造(按值返回)。这个方法会有一定的限制,不仅是效率上的,还有就是元素对象的先行构造不一定具备条件(比如元素类型是 std::thread
)。
3、要求无异常抛出的拷贝或移动构造函数:如果拷贝/移动过程抛出异常,将直接影响线程安全(锁的解锁、线程的异常终止 .... 这一系列潜在问题)。
4、返回指向弹出值的指针:指针是可以自由拷贝的平凡 (trivial) 类型,并且不会产生异常(间接地避免了「3」),并且为了更加安全,可使用共享指针 std::shared_ptr
来替代原生指针。
5、组合「2」「3」或者「3」「4」
灵活地在代码中,因地制宜地使用各种方式避免潜在的竞争条件才是上策。
以下是线程安全的 threadsafe_stack
类设计与示例代码。
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack : std::exception {
const char * what() const throw() {
return "empty_stack";
}
}
template<typename T>
class threadsafe_stack {
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() : data(std::stack<T>()) {}
threadsafe_stack(const threadsafe_stack & other) {
std::lock_guard<std::mutex> lock(other.m); // 先加锁,再拷贝
data = other.data;
}
threadsafe_stack & operator=(const threadsafe_stack &) = delete;
void push(T val) {
std::lock_guard<std::mutex> lock(m);
data.push(val);
}
std::shared_ptr<T> pop() {
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T & value) {
std::lock_guard<std::mutex> lock(m);
if (data.empty())
throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const {
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
当临界区过大的时候,同样会面临一些问题,主要是性能上这将降低并发带来的性能提升收益。所以,最好在确保线程安全的情况下,尽可能缩小临界区,减小锁的粒度。
如果需要在一个操作中保护多个数据,通常有两种方式。
通过一个互斥锁,增大锁的粒度(扩大临界区)来覆盖全部数据。
当互斥锁需要保护一个类的不同实例时,在下层上锁意味着要么「将上锁动作交给上层的用户」,要么「用一把锁保护所有实例」。无论如何,这两种方式都不是很好。
使用多个互斥锁来保护多个数据。
死锁 (deadlock) 的问题是最需要考虑的,这将导致每个线程都互相等待,双方都无法继续运行下去。
避免死锁的一般建议是,让互斥锁在任何情况下总是以相同的顺序上锁。
例外:在一些特殊情况下,比如同一个类的两个不同实例进行数据的交换操作,为了避免数据被并发修改,每个实例上的互斥锁均需锁住,这时如果总是以固定的顺序上锁(比如先是第一个参数,然后是第二个参数),在这个基础上只要交换一下两个参数的位置。这样两个线程试图在两个相同的实例之间交换数据,就会发生死锁。
毕竟,理论上 a 与 b 进行交换其实就是 b 与 a 交换,可就是这样,死锁也有可能随着用户调用时传递的参数位置变动而产生。
为了解决手动控制上锁顺序产生的麻烦,C++ 标准库提供了 std::lock
函数,它可以一次性锁住多个互斥锁,并且它经过特殊设计,没有死锁的风险。
以下是使用 std::lock
函数避免并发数据交换产生死锁问题的示例代码。
class X {
private:
std::mutex m;
some_type some_data;
public:
X(const some_type & sd) : some_data(sd) {}
friend void swap(X & lhs, X & rhs) {
if (&lhs == &rhs)
return; // 避免自复制
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.some_data, rhs.some_data);
}
};
其中,std::adopt_lock
表示互斥锁已经上锁,使得 std::lock_guard
只是接过它的所有权。
std::lock
提供了「要么全做要么不做」的语义 (all-or-nothing),如果锁定某个锁时抛出了异常,那么在它之前锁定的锁会被释放。
在 C++ 17 标准中提供了新的 RAII 模板 std::scoped_lock
,使得以上的操作可以被整合到一起。
std::scoped_lock
提供了可变参数,接受一个互斥锁类型的列表作为模板参数,并且将使用与 std::lock
相同的算法对这些互斥锁上锁。当实例被构造的时候,传递给其构造函数的互斥锁全部被上锁。类似地,析构的时候,互斥锁全部被解锁。
所以上述代码中的 swap 函数可被重写成以下形式 (C++ 17)。
void swap(X & lhs, X & rhs) {
if (&lhs == &rhs)
return;
std::scoped_lock guard(lhs.m, rhs.m); // 或者可以写成 std::scoped_lock<std::mutex, std::mutex> guard(lhs.m, rhs.m);
swap(lhs.some_data, rhs.some_data);
};
这样写不仅简便,而且不容易出错。
避免嵌套锁
这是最简单的原则,即如果线程持有了一把锁就不要再去获取一把锁。
虽然线程的相互等待也可能会造成死锁,但这个原则至少规避了锁用法层面的死锁问题。
避免持有锁时调用用户提供的代码
比如当你编写一个框架或者库的时候,在实现某项功能时需要调用用户提供的代码,那么就应当注意 —— 你无法得知用户的代码会做什么,尤其是如果用户代码也在尝试获取一把锁。
这一条其实是对上一条原则的延伸。
使用固定顺序获取锁
如果有特殊要求,无法使用 std::lock
以及其他相关的方法来获取锁的时候,就需要在每个线程上都是以固定的顺序获取它们。
但要注意一定保证「固定的顺序」,像之前的交换例子就是一个典型案例,类似的还有并发地以顺序及逆序遍历链表,它们将锁住感兴趣的节点(比如删除需要 3 个互斥锁),也有可能产生死锁。这又被称为交叉的 (hand-over-hand) 锁风格,对于链表的遍历,其解决方案是限制遍历的方向,以固定的顺序遍历链表。
使用锁的层次结构
锁的层次结构可以提供一种在运行时检查获取锁是否遵守约定的方案,锁的层次结构将应用程序分为多个层,并识别任何给定层中可能已锁住的所有互斥锁,当线程试图对一个互斥锁上锁,如果这个锁已经被下层持有,那么就无法上锁。这种层号可以被分配到每个互斥锁上,以表示这个互斥锁所在的层次,层次结构也将记录线程锁定了哪些锁。
不多说,其实很简单,上代码。
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);
hierarchical_mutex other_mutex(6000);
int do_low_level_stuff();
int low_level_func() {
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
int do_high_level_stuff(int some_param);
int high_level_func() {
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_high_level_stuff(do_level_func());
}
void do_other_stuff();
void other_stuff() {
high_level_func();
do_other_stuff();
}
void thread_a() { // 遵守了层次结构规则
high_level_func();
}
voidf thread_b() { // 没有遵守层次结构规则
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}
hierarchical_mutex 在上锁时,当目前互斥锁的层级值比之前大的时候,它将报告一个错误。
而这种方式恰好约束了上锁顺序,当前层级值的锁必须在没有比它更低(小)层级值的锁被锁上的情况下,才可以被上锁。
对于 std::lock_guard
类模板,它接收一个类型作为模板参数,其中它需要实现三个与互斥锁相关的成员函数,分别是 lock()
、unlock()
与 try_lock()
。因此我们也可以在这个基础上构建我们自己的带有层次结构特性的互斥锁。
class hierarchical_mutex {
std::mutex internal_mutex;
const uint32_t hierarchy_value;
uint32_t previous_hierarchy_value;
static thread_local uint32_t this_thread_hierarchy_value;
void check_for_hierarchy_violation() {
if (this_thread_hierarchy_value <= hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value() {
previous_hierarchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hierarchy_value;
}
public:
explicit hierarchical_mutex(uint32_t value) :
hierarchy_value(value),
pervious_hierarchy_value(0)
{}
void lock() {
check_for_hierarchy_violation(); // 检查
internal_mutex.lock(); // 锁定
update_hierarchy_value(); // 更新
}
void unlock() {
if (this_thread_hierarchy_value != hierarchy_value)
throw std::logic_error("mutex hierarchy violated");
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock() {
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local uint32_t hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
🖊️ 本文由 Alone Café 创作,如果您觉得本文让您有所收获,请随意赞赏 🥺
⚖️ 本文以 CC BY-NC-SA 4.0,即《署名-非商业性使用-相同方式共享 4.0 国际许可协议》进行许可
👨⚖️ 本站所发表的文章除注明转载或出处外,均为本站作者原创或翻译,转载前请务必署名并遵守上述协议
🔗 原文链接:https://alone.cafe/2021/11/cpp并发编程-3
📅 最后更新:2021年11月07日 Sunday 12:37
Update your browser to view this website correctly. Update my browser now