并发是指同时执行多个任务,其意义有二:
提高吞吐率:让多个处理器共同参与一个计算过程
提高响应速度:允许程序的一部分在另一部分处于等待状态时保持运行
几乎所有现代编程语言都对并发提供了不同程度的支持。C++标准库并发特性的前身已在C++社区使用了20多年,经过对可移植性和类型安全性的持续改进,几乎适用于所有现代硬件平台和操作系统。标准库并发特性的重点是提供对系统级并发机制的支持,而非另行构建复杂的高层并发模型。那些高层并发模型,可以基于标准库工具实现,并以第三方库的形式提供给开发用户。
标准库的并发特性直接支持在单一地址空间内同时执行多个线程的编程模型。为此,C++提供了合适的内存模型和一套原子化操作。原子操作允许无锁编程,而内存模型则保证了,只要程序员能有效地避免数据竞争(对可变数据的不受控制的并发访问),一切都会如人们所期望的那样工作。然而,大多数用户所理解的并发,更多还是源于对标准库及基于标准库的其它第三方库的使用。因此,这里有必要强调标准库的主要并发特性,如thread、mutex、lock、packaged_task和future等组件,并给出一些示例。这些特性直接根植于操作系统所提供的内核服务(系统调用),但与直接调用系统调用不同,标准库的并发特性不会产生额外的性能开销,当然也不可能保证显著的性能改进。
永远不要迷信并发是解决一切性能问题的灵丹妙药。如果一项任务可以按顺序完成,那么这样做通常更简单,也更高效。因为,将信息从一个线程传递到另一个线程本身,就是代价高昂的操作。
作为采用显式并发特性的替代方案,通常还可以借助并行算法,充分发挥多个执行引擎共同完成一个计算任务的能力,以获得更好的性能表现。
此外,C++还支持协程,以使函数可以在调用之间保持各自的状态。
C++11:内存模型
所谓任务(task),就是可与其它计算并发执行的计算过程,而所谓线程(thread),就是任务在程序中的系统级表示。若要启动一个可与其它任务并发执行的任务,就需要创建一个thread类型的对象,该类型的定义位于<thread>头文件中,同时将表示任务的函数或函数对象,作为传递给该对象构造函数的参数。例如:
xxxxxxxxxx
51// 表示任务的函数
2
3void task() {
4 ... 任务的执行过程 ...
5}
xxxxxxxxxx
81// 表示任务的函数对象类
2
3class Task {
4public:
5 void operator()() {
6 ... 任务的执行过程 ...
7 }
8};
xxxxxxxxxx
51thread t1{ task }; // 用表示任务的函数创建线程对象,该函数在独立的线程中执行
2thread t2{ Task{} }; // 用表示任务的函数对象创建线程对象,该函数对象在独立的线程中执行
3...
4t1.join(); // 等待第一个子线程结束
5t2.join(); // 等待第二个子线程结束
join函数保证其后的代码在所等待的线程结束之后才会被执行。这个过程称为线程会合,即等待线程结束。
join函数的问题在于很容易被人们忘记,而一旦忘记,后果通常会很严重。为此,标准库提供了jthread类,该类与thread类的不同之处在于,通过在其析构函数中调用join函数,实现了所谓自动会合的功能。例如:
xxxxxxxxxx
21jthread t1{ task }; // 用表示任务的函数创建线程对象,该函数在独立的线程中执行,线程对象析构时会合该线程
2jthread t2{ Task{} }; // 用表示任务的函数对象创建线程对象,该函数对象在独立的线程中执行,线程对象析构时会合该线程
线程会合由线程对象的析构函数自动完成,而析构函数的调用顺序与构造函数相反,因此这里是先会合t2所表示的线程,再会合t1所表示的线程。
一个程序中的所有线程共享同一个地址空间。这也是线程与进程的主要区别之一,进程间通常不共享地址空间。由于线程共享同一个地址空间,因此线程间可以通过共享数据相互通信,同时借助锁或其它同步机制,规避因数据竞争(对可变数据的不受控制的并发访问)而引发的线程冲突。
编写线程安全的代码并不容易。例如:
xxxxxxxxxx
51// 表示任务的函数
2
3void task() {
4 cout << "Hello, ";
5}
和
xxxxxxxxxx
81// 表示任务的函数对象类
2
3class Task {
4public:
5 void operator()() {
6 cout << "World!" << endl;
7 }
8};
这是一个典型的严重错误。task和Task都向cout表示的输出流对象中插入数据,而没有采取任何形式的同步。最终的输出结果将不可预测,而且每次运行程序都可能得到不同的输出结果,因为两个线程的执行顺序的不确定的。程序可能产生下面这样的奇怪输出:
xxxxxxxxxx
11WoHerllod, !
只有来自标准的特定保证,才能避免那些可能导致崩溃的数据竞争。对于输出流而言,要么只让一个线程使用输出流,要么使用osyncstream。
定义并发任务所追求的目标,是保持任务间的完全隔离,唯一的例外是与其它任务通信的部分,而这种通信应该以简单而明显的方式进行。思考并发任务的最简单方式,是将其视作一个可以与调用者过程并发执行的函数,只需为其传递参数,获取结果,并保证二者不同时使用共享数据,即不发生数据竞争即可。
C++11:thread
典型地,线程需要处理数据。可以将数据、指向数据的指针或引用数据的引用,作为参数传递给线程。例如:
xxxxxxxxxx
61void task(char ch, milliseconds ms) {
2 for (int i = 0; i < 100; ++i) {
3 cout << ch << flush;
4 sleep_for(ms);
5 }
6}
xxxxxxxxxx
151class Task {
2public:
3 Task(char ch, milliseconds ms) : ch(ch), ms(ms) {}
4
5 void operator()() {
6 for (int i = 0; i < 20; ++i) {
7 sleep_for(ms);
8 cout << ch << flush;
9 }
10 }
11
12private:
13 char ch;
14 milliseconds ms;
15};
xxxxxxxxxx
21jthread t1{ task, '.', 100ms };
2jthread t2{ Task{ '|', 500ms } };
向线程传递参数有两种方式:
通过thread或jthread类的构造函数,向线程传递参数,如“jthread t1{ task, '.', 100ms }”。thread和jthread类的构造函数的第一个参数为表示任务的函数或函数对象,其余参数为传递给该任务函数的参数,类型任意,数量不定,编译器负责匹配性检查。thread和jthread其实都是带有可变参数列表的模板类
将线程所需要的参数定义为表示任务的函数对象的成员,通过该函数对象的构造函数,向线程传递参数,如“jthread t2{ Task{ '|', 500ms } }”
无论以哪种方式向线程传参,如果线程所持有的仅仅是某个对象的指针或引用,一定要保证该指针或引用的目标对象的生命周期,不短于其在线程中被访问的周期。从这个意义上讲,向线程传递值,比传递指针或引用更安全。
表示任务的函数和函数对象都不能带有返回值,因此线程的处理结果(如果有的话),只能通过该函数或函数对象的输出参数或输出成员,提供给线程的创建者。例如:
xxxxxxxxxx
101void task(char ch, milliseconds ms, milliseconds& elapsed) {
2 auto start = system_clock::now();
3
4 for (int i = 0; i < 100; ++i) {
5 cout << ch << flush;
6 sleep_for(ms);
7 }
8
9 elapsed = duration_cast<milliseconds>(system_clock::now() - start);
10}
xxxxxxxxxx
211class Task {
2public:
3 Task(char ch, milliseconds ms, milliseconds& elapsed)
4 : ch(ch), ms(ms), elapsed(elapsed) {}
5
6 void operator()() {
7 auto start = system_clock::now();
8
9 for (int i = 0; i < 20; ++i) {
10 sleep_for(ms);
11 cout << ch << flush;
12 }
13
14 elapsed = duration_cast<milliseconds>(system_clock::now() - start);
15 }
16
17private:
18 char ch;
19 milliseconds ms;
20 milliseconds& elapsed;
21};
xxxxxxxxxx
91milliseconds elapsed1, elapsed2;
2
3thread t1{ task, '.', 100ms, ref(elapsed1) };
4thread t2{ Task{ '|', 500ms, elapsed2 } };
5
6t1.join();
7t2.join();
8
9cout << elapsed1 << ' ' << elapsed2 << endl; // 10860ms 10224ms
这里使用了来自<functional>头文件的类型函数ref,以显式告诉编译器将elapsed1视为引用,否则编译器将按对象方式推导thread类模板的相应类型参数,而这与task函数第三个参数的类型并不一致,报告编译错误。
这种获得线程结果的方法的确可行,而且应用得也很普遍,只是通过指针或引用输出数据,显得不太优雅。
有时需要在多个线程间共享数据,这时就必须对访问进行同步。原则上在任何时候,都只允许最多一个线程访问共享数据。当然,多个线程同时读取不可变数据,其实是没有问题的。
mutex是一种互斥对象,是线程间并发访问共享数据的关键元素。线程通过调用mutex对象的lock成员函数,获得该mutex,并通过调用其unlock成员函数释放之。任何时候,一个mutex最多只能被一个线程所持有,任何试图获取已为其它线程所持有的mutex的操作,都会阻塞,直到其持有者主动释放,阻塞的一方才会被唤醒,并在成功获取该mutex后,成为其新的持有者,直至主动释放。标准库在<mutex>头文件中给出了关于mutex的定义。
例如:
xxxxxxxxxx
21int g;
2mutex m;
xxxxxxxxxx
71void task() {
2 for (int i = 0; i < 10000; ++i) {
3 m.lock();
4 ++g;
5 m.unlock();
6 }
7}
xxxxxxxxxx
61thread t1{ task }, t2{ task };
2
3t1.join();
4t2.join();
5
6cout << g << endl; // 20000
scoped_lock<mutex>类在构造函数中获取mutex,并在析构函数中释放mutex。因此,可以将访问共享数据的代码放在一个独立的作用域(语句块)中,并将scoped_lock<mutex>类型的对象声明为该作用域的局部变量,以此实现对mutex的自动获取和释放。更重要的是,这样做可以避免因提前返回或抛出异常而错过释放mutex。例如:
xxxxxxxxxx
61void task() {
2 for (int i = 0; i < 10000; ++i) {
3 scoped_lock lck{ m };
4 ++g;
5 }
6}
共享数据和mutex之间的对应关系依赖于约定。程序员必须清楚哪个mutex用于保护哪个共享数据。显然,这很容易出错。因此,最好还是利用一些语言手段,让这个对应关系变得更清晰也更好维护。例如:
xxxxxxxxxx
51class Record {
2public:
3 mutex rm;
4 ...
5};
xxxxxxxxxx
11Record rec{ ... };
在访问rec对象的任何部分之前,先获取rec.rm。
需要同时访问多个共享数据的情况并不鲜见,这可能导致死锁。例如,线程1在成功获取mutex1后试图获取mutex2,同时,线程2在成功获取mutex2后试图获取mutex1,这样一来,两个线程都不会继续执行。
scoped_lock可以同时获取多个锁。例如:
xxxxxxxxxx
11scoped_lock lck{ mutex1, mutex2 }; // 获取两个mutex
scoped_lock类的构造函数只有在获取了全部参数mutex后才会继续,并在持有mutex时不会阻塞。scoped_lock类的析构函数负责释放其所持有的全部mutex。
基于共享数据的通信是非常底层的操作。尤其是,程序员必须设法了解各项任务已经完成和尚未完成的工作。在这方面,使用共享数据就不如调用函数并接收返回值。另一方面,有些人笃信共享数据一定比拷贝函数的参数和返回值更有效率。不可否认,当涉及大量数据时的确如此,但频繁获取和释放mutex同样也是有性能开销的,而且开销还不小。再者,现代计算机对于数据复制其实非常擅长,尤其是象vector这样的紧凑型数据。最好不要因效率之故而不假思索地选择基于共享数据的通信,先测量再做选择。
基本的mutex在任何时候都只能被一个线程所持有。但对共享数据的常见访问却往往是,多线程读取和单线程写入,即读共享写独占的同步逻辑。shared_mutex类支持这种读写锁的用法。多个读线程可以同时拥有同一个shared_mutex,这时任何写线程都不能再获取它,而一旦有一个写线程持有这个shared_mutex,任何其它线程,无论读写,都无法再获取它,这就叫读共享写独占。
例如:
xxxxxxxxxx
11shared_mutex m; // 可共享的互斥对象
xxxxxxxxxx
41void reader() {
2 shared_lock lck{ m }; // 以共享方式获取shared_mutex
3 ... 读共享数据 ...
4}
xxxxxxxxxx
41void writer() {
2 unique_lock lck{ m }; // 以独占方式获取shared_mutex
3 ... 写共享数据 ...
4}
C++11:互斥量和锁
C++14:shared_mutex、shared_lock和unique_lock
C++17:scoped_lock
mutex涉及操作系统底层,属于代价较重的线程同步机制。它允许在没有数据竞争的前提下,完成对任意数量共享数据的并发访问。然而,如果只针对少量数据,使用更简单、更轻量级的低成本机制,也不失为一种明智的选择。例如atomic变量,亦称原子量。例如下面的代码,这是一个经典的“双重检查锁定”的简单示例:
xxxxxxxxxx
21mutex m;
2atomic<bool> init = false; // 是否已经初始化
xxxxxxxxxx
71if (!init) {
2 lock_guard lck{ m };
3 if (!init) {
4 ... 初始化操作 ...
5 init = true;
6 }
7}
为了保证“初始化操作”只执行一次,这里使用了“双重检查锁定”策略。其中的init,就是一个bool类型的原子量。借助原子量,可以在不使用mutex的前提下,避免并发访问init时发生数据竞争。另外,这里还是使用lock_guard,它与scoped_lock功能相同,但因其仅获取一个mutex,故比后者简单得多。
C++11:低层并发支持(atomic)
C++20:对atomic的诸多扩展
一个线程有时需要等待某个外部事件,如另一个线程完成了某个任务,或是过去了一段时间。最简单的事件就是时间流逝。使用<chrono>头文件中与时间相关的特性,可以很好地完成这类任务。例如:
xxxxxxxxxx
141import std;
2
3using namespace std;
4using namespace std::this_thread;
5using namespace std::chrono;
6
7int main() {
8 auto start = high_resolution_clock::now();
9
10 sleep_for(nanoseconds{ 1 });
11
12 cout << duration_cast<nanoseconds>(
13 high_resolution_clock::now() - start).count() << "纳秒" << endl; // 15400纳秒
14}
this_thread命名空间中的sleep_for函数,可令当前线程(包括主线程)睡眠参数指定的时间。duration_cast模板函数,用于将参数时间段调整为所期望的单位,如纳秒。
通过外部事件实现线程间通信的基本方法是使用<condition_variable>头文件中的condition_variable类。condition_variable提供了一种机制,它允许一个线程等待另一个线程。特别地,它允许线程等待某个条件(事件)的满足(发生),而这个条件(事件)是否满足(发生),往往取决于其它线程是否完成了某项特定的工作。
condition_variable支持很多优雅而高效的数据共享方式,但也有可能变得相当复杂。考虑两个线程通过一个queue传递消息的经典例子:
xxxxxxxxxx
31queue<string> q; // 消息队列
2mutex m; // 互斥体
3condition_variable cv; // 条件变量
这里的queue、string、mutex和condition_variable皆由标准库提供。
消费者线程负责从消息队列读取消息:
xxxxxxxxxx
101// 消费者线程的任务函数
2
3void consumer() {
4 for (;;) {
5 unique_lock lck{ m };
6 cv.wait(lck, [] { return !q.empty(); }); // 消息队列为空即等待,同时释放互斥体
7 cout << q.front() << endl;
8 q.pop();
9 }
10}
这里通过对mutex上的unique_lock进行显式保护,实现了对queue和condition_variable的操作。condition_variable类的wait成员函数,会在其第二个参数返回false时,令调用线程在condition_variable中等待,同时释放所持有的mutex,直至其被唤醒,并在重新获得该mutex后,再次检查其第二个参数的返回值,以决定是继续等待还是立即返回。这个过程已经考虑到其它消费者线程提前获得mutex并抢先消费的情形。
还有一个细节需要注意,就是传递给condition_variable类wait成员函数的第一个参数,可以是unique_lock类型的对象,但不能是scoped_lock类型的对象。
生产者线程负责向消息队列写入消息:
xxxxxxxxxx
91// 生产者线程的任务函数
2
3void producer() {
4 for (;;) {
5 scoped_lock lck{ m };
6 q.push("message");
7 cv.notify_one(); // 向在cv中等待的线程发出通知
8 }
9}
condition_variable类的notify_one成员函数,负责向该condition_variable中处于等待状态的线程发送通知,唤醒该线程,以令其有机会从wait函数中返回,并继续执行后续操作。
C++11:条件变量
标准库提供了一些特性,允许程序员在抽象的任务层,指派需要同时执行的某项工作,而不必基于底层的线程和锁机制实现并发与同步。这些特性包括:
future和promise:在一个线程中获取另一个线程中就绪的值
packaged_task:连接两个线程中的future和promise
async:以类似函数调用的方式启动独立的线程
C++11:高层并发支持(packaged_thread、future、promise和async)
future和promise的妙处在于,可以在不显式使用任何同步机制的前提下,获取在另一个线程中就绪的值。它的基本思路非常简单,当一个线程需要向另一个线程输出数据时,可将其值置于promise中,而另一个线程则从future中获取该值。
假设所要传输的数据是一个X类型的对象,fx是一个future<X>类型的对象,则从future中获取数据的代码如下:
xxxxxxxxxx
71try {
2 X x = fx.get(); // 获取数据
3 ... 使用数据 ...
4}
5catch (SomeException ex) {
6 ... 处理异常 ...
7}
如果数据尚未就绪,get函数会阻塞,直至数据被放入相应的promise。get函数可能会抛出异常,该异常可能来自系统,也可能来自相应的promise。
假设px是一个promise<X>类型的对象,则向promise中设置数据的代码如下:
xxxxxxxxxx
71try {
2 ... 产生数据 ...
3 px.set_value(x); // 设置数据
4}
5catch (...) {
6 px.set_exception(current_exception()); // 设置异常
7}
其中current_exception函数用于获取当前被捕获的异常。
C++11:拷贝和重新抛出异常
下面的问题是,用于在两个线程间传输数据的future-promise对如何得到呢?标准库为此提供了一个名为packaged_task的类,以简化在线程中操作future和promise的过程。例如只需向packaged_task发出一个get_future请求,它就会返回一个future,并在内部维护与之对应的promise,分别用于读写数据。例如:
xxxxxxxxxx
61int divide(int x, int y) {
2 if (!y)
3 throw runtime_error("divide by zero");
4
5 return x / y;
6}
这是一个很普通的计算整数除法的函数,其中包含除零检测。现在希望在独立的子线程中执行该函数,并在主线程中得到该函数的返回值或可能抛出的异常:
xxxxxxxxxx
161packaged_task p1{ divide };
2packaged_task p2{ divide };
3
4future<int> f1{ p1.get_future() };
5future<int> f2{ p2.get_future() };
6
7jthread t1{ move(p1), 15, 3 };
8jthread t2{ move(p2), 10, 0 };
9
10try {
11 cout << f1.get() << endl;
12 cout << f2.get() << endl;
13}
14catch (const exception& ex) {
15 cerr << ex.what() << endl;
16}
运行该程序,产生如下输出:
xxxxxxxxxx
215
2divide by zero
packaged_task将任务函数或函数对象的类型(int(int,int))作为其模板参数,并将任务函数或函数对象(divide)作为其构造参数。从packaged_task对象中获取用于读取线程输出的future对象,该输出来自任务函数的返回值。创建线程时需要使用move,因为packaged_task不支持拷贝。
这段代码并没有明确提及和锁有关的任何东西,程序员得以专注于任务本身,而非在任务间的通信机制。这两个任务在各自独立的线程中执行,因而可能是并行的。
抛却一切与系统级底层线程有关的概念,人们需要的可能仅仅是一个凑巧需要和其它函数同时执行的函数。这种编程模型虽然不是C++标准库唯一支持的模型,但是可以很好地满足大多数场景下对于并发的需求。
标准库的async函数用于启动一个异步执行的任务。例如:
xxxxxxxxxx
101future<int> f1 = async(divide, 15, 3);
2future<int> f2 = async(divide, 10, 0);
3
4try {
5 cout << f1.get() << endl;
6 cout << f2.get() << endl;
7}
8catch (const exception& ex) {
9 cerr << ex.what() << endl;
10}
async将函数的调用和返回值的获取截然分开,并将这两部分在不同的线程中处理。借助async完全无需操心任何与线程和锁有关的细节,只需专注于需要异步执行的任务本身,如divide。当然,如果多个异步执行的任务需要使用共享资源,且共享资源需要通过锁机制加以保护,则不应该使用async。使用async时,甚至不知道具体创建了多少个线程,这完全由async决定。async根据其被调用时的系统可用资源量确定创建多少个线程。
请注意,使用async并非仅仅为了利用并行计算提高程序的运行性能。它的核心是异步而非并行,虽然并行是实现异步的一种手段。例如程序可能一方面需要从用户处获取输入,另一方面还要执行其它计算。
有时需要终止一个线程的运行,因为不再对其结果感兴趣。简单地杀死线程,通常是不妥当的,因为线程可能正持有某些必须释放的资源,如mutex、子线程或数据库连接等。为此,标准库提供了一种礼貌地请求线程执行清理,而后优雅地离开的机制——stop_token。如果一个线程具有stop_token并被请求终止,则它自然会体面地终止。
假设find_any是一个并行算法,它会产生许多线程并在其中执行find函数以寻找结果。当其中任何一个线程找到结果时,其它线程即可终止运行。这里的find函数符合常见任务风格的典型模式,其中有一个主循环,根据对stop_token的测试,决定是继续还是退出:
xxxxxxxxxx
111automic<int> result = -1; // 结果索引
2
3template<class T> struct Range { T* begin; T* end; }; // 用起止迭代器表示的范围
4
5void find(stop_token st, const string* base, const Range<string> r, const string& key) {
6 for (string* p = r.begin; p != r.end && !st.stop_requested(); ++p)
7 if (math(*p, key)) { // 若与查找目标匹配
8 result = p - base; // 获得匹配元素的索引
9 return;
10 }
11}
stop_token对象的stop_requested成员函数,用于判断是否有其它线程请求终止该线程。stop_token是安全传达此类请求的机制(无数据竞争)。并行算法函数find_any的定义类似下面这个样子:
xxxxxxxxxx
211void find_any(const vector<string>& vs, const string& key) {
2 string* base = &vs[0];
3 int size = vs.size();
4 int mid = size / 2;
5
6 stop_source ss1{};
7 // 开启第一个线程,在前一半中查找
8 jthread t1{ find, ss1.get_token(), base, Range{ base, base + mid }, key };
9
10 stop_source ss2{};
11 // 开启第二个线程,在后一半中查找
12 jthread t2{ find, ss2.get_token(), base, Range{ base + mid, base + size }, key };
13
14 while (result == -1) sleep_for(10ms); // 在自旋循环中等待查找结果
15
16 // 终止所有线程
17 ss1.request_stop();
18 ss2.request_stop();
19
20 ... 使用结果 ...
21}
stop_source对象可以产生stop_token对象,并通过它的request_stop成员函数,向线程传递终止请求。
这里假设查找结果一定可以得到,否则等待查找结果的自旋循环将永远不会退出。
协程是一种在多次调用的过程中保持其状态的函数。在这方面,它有点象函数对象,但在每次调用时,协程可以隐式地、完整地恢复上一次的状态,并保存这一次的状态。例如:
xxxxxxxxxx
101generator<long long> fib() {
2 long long a = 0, b = 1;
3 while (a < b) {
4 auto c = a + b;
5 co_yield c; // 保存状态,返回c的值并等待
6 a = b;
7 b = c;
8 }
9 co_return 0; // 溢出
10}
这是一个典型的求斐波那契数列的函数。第一次调用,返回斐波那契数列的第一项,再次调用,返回第二项,以此类推。例如:
xxxxxxxxxx
21for (int i = 0; i < 10; ++i)
2 cout << fib() << ' '; // 1 2 3 5 8 13 21 34 55 89
协程在每次被调用时,将当前栈帧保存到generator类型的返回值中,并在下一次被调用时从中恢复。co_yield返回值并等待下一次被调用。co_return返回值并终止协程。
协程可以是同步的,即调用者等待结果,也可以是异步的,即调用者执行其它操作,直到协程返回结果。上述斐波那契数列的例子显然是同步的。一些优化器可以内联方式优化协程调用,甚至将上述代码直接优化为:
xxxxxxxxxx
11cout << "1 2 3 5 8 13 21 34 55 89";
协程框架的实现极其灵活,能够服务于极端范围的潜在用途。它是由专家设计并为专家使用的工具,具有标准化委员会的典型设计风格。遗憾的是,直到C++20,仍然缺乏能让简单功能轻松实现的基础设施,甚至连generator模板都还不是标准库的一部分。作为替代方案,可以使用一些第三方库,如CppCoro(A coroutine library for C++)等,详情参见https://github.com/lewissbaker/cppcoro。
C++20:协程
高德纳(Donald E. Knuth)在《计算机程序设计艺术(The Art of Computer Programming)》第一卷中盛赞了协程的实用性,但也感慨于很难给出简短的例子。协程存在的价值即是对复杂系统的简化。C++语言在早期获得成功的原因之一,是其对事件驱动模型的模拟。其中的核心思想是将一个复杂任务,表示为由一系列子任务(协程)组成的系统。其中的每个子任务只负责很小的一部分工作,如数据生成、网络计算、输入输出,等等。子任务和子任务之间通过消息队列通信。每个子任务在产生结果后,即将自己放在一个任务队列中等待新的工作。调度器会在需要时从该队列中选择下一个需要执行的子任务。这就是一个典型的协作式多任务处理系统。这种源自Simula语言的设计思想最终成为构建第一个C++库的基础。
这种设计的关键是:
有许多不同的协程,它们在每次被调用时,仍保留着上次被调用时临返回前的状态
借助某种形式的多态,将分属于不同协程的事件排入事件队列,并以与类型无关的方式处理它们
调度程序负责根据事件选择需要调用的协程
首先,借助运行时多态,统一调用数十甚或数百个不同类型的协程:
xxxxxxxxxx
51class BasicEvent {
2public:
3 virtual ~BasicEvent() {}
4 virtual void operator()() = 0;
5};
xxxxxxxxxx
171template<typename Act>
2class Event : public BasicEvent {
3public:
4 Event(const string& name, Act act) : m_name{ name }, m_act{ move(act) } {}
5
6 void operator()() override {
7 m_act();
8 }
9
10 const string& name() const {
11 return m_name;
12 }
13
14private:
15 string m_name; // 名称
16 Act m_act; // 动作
17};
Event存储动作并调用之,动作通常是一个协程。事件除了包含动作,还可以包含其它内容,比如名称。
下面是一段极简单的使用事件的代码:
xxxxxxxxxx
181// 持有协程的事件列表
2vector<BasicEvent*> events{
3 new Event{ "integers", integers(1, 2) },
4 new Event{ "chars", chars('a') }
5};
6
7// 订单列表
8vector orders{ 0, 1, 1, 0, 1, 0, 1, 0, 0 };
9
10// 调用与每个订单相对应的协程
11for (int order : orders)
12 (*events[order])();
13
14cout << endl;
15
16// 清理
17for (auto event : events)
18 delete event;
其中的integers和chars是两个不同类型的协程:
xxxxxxxxxx
61Task integers(int start, int step = 1) {
2 for (auto value = start; ; value += step) {
3 cout << value << ' '; // 输出
4 co_yield 0; // 挂起
5 }
6}
xxxxxxxxxx
61Task chars(char start) {
2 for (auto value = start; ; ++value) {
3 cout << value << ' '; // 输出
4 co_yield 0; // 挂起
5 }
6}
它们都借助于co_yield在两次调用之间将自己挂起。这意味着Task必须是一个协程句柄。
“魔法”就隐藏在Task中。它保存协程在每次被调用时临返回前的栈帧,并确定co_yield的含义:
xxxxxxxxxx
411class Task {
2public:
3 // 映射到语言类型
4 class promise_type {
5 public:
6 suspend_always initial_suspend() {
7 return {};
8 }
9
10 // 对应co_yield
11 suspend_always yield_value(int) {
12 return {};
13 }
14
15 // 对应co_return
16 suspend_always final_suspend() noexcept {
17 return {};
18 }
19
20 auto get_return_object() {
21 return Task{ Handle::from_promise(*this) };
22 }
23
24 void return_void() {}
25
26 void unhandled_exception() {
27 exit(1);
28 }
29 };
30
31 using Handle = coroutine_handle<promise_type>;
32
33 // 由promise_type::get_return_object调用
34 Task(Handle handle) : m_handle(handle) {}
35
36 void operator()() {
37 m_handle.resume();
38 }
39
40 Handle m_handle; // 协程句柄
41};
除非作为库的实现者,强烈建议不要自己编写这样的代码。网上有很多类似代码的范例可资借鉴。
运行程序,得到如下输出:
xxxxxxxxxx
111 a b 3 c 5 d 7 9
借助并发提高响应速度或吞吐率
只要能承受,尽可能使用高层抽象
将进程作为线程的替代方案
标准库的并发特定是类型安全的
内存模型的存在是为了让大多数程序员不必考虑计算机的硬件架构和级别
内存模型使内存大致按照程序员所期望的那样呈现
原子操作允许程序员编写无锁的代码
无锁编程还是留给专家们吧
有时串行方案比并行方案更简单也更快
避免数据竞争
与其直接使用并发,不如选择并行算法
thread是系统线程的类型安全接口
用join函数等待线程结束
优先使用jthread而非thread
尽可能避免显式地共享数据
尽量使用基于RAII的自动化管理,避免手动执行加锁、解锁
使用scoped_lock管理mutex
使用scoped_lock获取多个mutex
使用shared_lock实现读写锁逻辑
将mutex和被其保护的数据定义到一起
借助atomic实现简单的数据共享
借助condition_variable管理线程间通信
当需要复制锁或执行较低级别的同步操作时,应使用unique_lock而非scoped_lock
在condition_variable中使用unique_lock而非scoped_lock
不要无条件等待,在循环等待中需要增加条件判断
尽量减少在关键部分花费的时间
从并发任务的角度考虑,而非直接从线程的角度考虑
追求简洁
相对于线程和mutex,优先考虑packaged_task和future
向promise写入结果,从future读取结果
通过packaged_task捕获任务(子线程)抛出的异常
借助packaged_task和future表示对外部服务的请求并等待其响应
借助async函数开启简单的并发任务
借助stop_token实现协同式终止
协程可以比线程小得多
尽可能使用成熟的协程库,而不要手工编写底层代码