线程同步方式
互斥锁
概述:
用于保护临界区,确保同一时间只有一个线程可以访问共享资源。常见的互斥锁有std::mutex,std::lock_guard和std::unique_lock
mutex
概述:
用于管理多个线程对共享资源的互斥访问,防止数据竞争和并发问题
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include <iostream> #include <thread> #include <mutex>
int cnt = 0; std::mutex mtx;
void increment() { for (int i = 0; i < 1000; i++) { mtx.lock(); ++cnt; mtx.unlock(); } }
int main() { std::thread t1(increment); std::thread t2(increment);
t1.join(); t2.join();
std::cout << "Final cnt: " << cnt << std::endl; return 0; }
|
如果不想阻塞线程,可以使用try_lock()
std::recursive_lock:
允许同一线程多次加锁,但是必须匹配相同次数的unlock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <iostream> #include <mutex> #include <thread>
std::recursive_mutex rmtx;
void recursiveFunction(int n) { if (n <= 0) return;
rmtx.lock(); std::cout << "Lokcing " << n << std::endl; recursiveFunction(n-1); rmtx.unlock(); }
int main() { std::thread t1(recursiveFunction, 3); t1.join(); return 0; }
|
lock_guard
概述:
一个轻量级的互斥锁管理器,确保在作用域结束时自动释放
主要特点:
- 自动管理锁的获取和释放
- 不支持手动解锁
- 适用于短时间加锁的场景
基本用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <iostream> #include <mutex> #include <thread>
std::mutex mtx;
void printMessaga(const std::string& message) { std::lock_guard<std::mutex> lock(mtx); std::cout << message << std::endl; }
int main() { std::thread t1(printMessaga, "Hello from thread 1"); std::thread t2(printMessaga, "Hello from thread 2");
t1.join(); t2.join();
return 0; }
|
构造函数:
会在构造函数时自动加锁,如果不想立刻加锁,可以使用std::adopt_lock
1 2
| std::mutex mtx; std::lock_guard<std::mutex> lock(mtx, std::adopt_lock);
|
此时mtx必须在lock_guard之前已经被手动lock(),否则会导致未定义行为
unique_lock
概述:
一种互斥锁管理器,比std::lock_guard更加灵活,主要用于管理std::mutex或其他BasicLockable类型的互斥锁
主要特点:
- 支持延迟锁定:std::unique_lock可以在构造时不立即获取锁,而是在稍后在需要时显式地lock()
- 支持显式解锁:可以在持有锁的情况下调用unlock()释放锁,而不像std::lock_guard那样必须等到作用域结束时释放锁
- 支持所有权的转移:可以同故宫std::move进行所有权的转移
直接锁定互斥锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| #include <iostream> #include <mutex> #include <thread>
std::mutex mtx;
void threadFunc() { std::unique_lock<std::mutex> lock(mtx); std::cout << "Thread " << std::this_thread::get_id() << " is working\n"; }
int main() { std::thread t1(threadFunc); std::thread t2(threadFunc);
t1.join(); t2.join(); }
|
延迟锁定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <iostream> #include <mutex> #include <thread>
std::mutex mtx;
void threadFunc() { std::unique_lock<std::mutex> lock(mtx, std::defer_lock); std::this_thread::sleep_for(std::chrono::milliseconds(100)); lock.lock(); std::cout << "Thread" << std::this_thread::get_id() << " is working\n"; }
int main() { std::thread t1(threadFunc); std::thread t2(threadFunc);
t1.join(); t2.join(); }
|
显式解锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #include <iostream> #include <mutex> #include <thread>
std::mutex mtx;
void threadFunc() { std::unique_lock<std::mutex> lock(mtx); std::cout << "Thread " << std::this_thread::get_id() << " acquired lock\n";
lock.unlock(); std::cout << "Thread " << std::this_thread::get_id() << " released lock\n";
}
int main() { std::thread t1(threadFunc); std::thread t2(threadFunc);
t1.join(); t2.join(); }
|
条件变量
概述:
允许线程在不满足条件时挂起等待,在条件满足时被唤醒继续执行。通常与互斥锁一起使用,确保在修改共享数据不会出现竞态条件,常见的条件变量有std::condition_variable和std::condition_variable_any
condition_variable
概述:
在多线程环境下,通常会遇到生产者-消费者问题:线程A需要等数据准备后再执行,但它不能一直忙等,线程B生成数据后需要通知线程A继续执行。而std::condition_variable允许一个线程等待某个条件变为真,而另一个线程可以通知它条件已满足
主要方法:
- wait(std::unique_lock<std::mutex>&
lock):让当前线程等待,直到被notify_one()或notify_all()唤醒
- wait(std::unique_lock<std::mutex>& lock, Predicate
pred):和上面的方法类似,但会在等待前和被唤醒后检查pred是否为true
- notify_one():唤醒一个等待的线程
- notify_all():唤醒所有等待的线程
基本用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #include <iostream> #include <condition_variable> #include <mutex> #include <thread> #include <queue>
std::queue<int> dataQueue; std::mutex mtx; std::condition_variable cv; bool ready = false;
void producer() { for (int i = 1; i <= 5; ++i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::lock_guard<std::mutex> lock(mtx); dataQueue.push(i); ready = true; std::cout << "Produced " << i << std::endl; cv.notify_one(); } }
void consumer() { while (true) { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, [] { return ready || !dataQueue.empty(); });
while (!dataQueue.empty()) { int value = dataQueue.front(); dataQueue.pop(); std::cout << "Consumed: " << value << std::endl; }
ready = false; } }
int main() { std::thread t1(producer); std::thread t2(consumer);
t1.join(); t2.join();
return 0; }
|
实战练习:
多线程顺序打印数字:创建3个线程,按顺序依次打印1-100,即线程1打印1,线程2打印2,线程3打印3,然后线程1打印4,线程2打印5,线程3打印6...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #include <iostream> #include <thread> #include <mutex> #include <condition_variable>
int current = 1; std::mutex mtx; std::condition_variable cv;
void printNumber(int threadId) { while (true) { std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, [threadId] { return current > 100 || (current - 1) % 3 == threadId; });
if (current > 100) { break; }
std::cout << "Thread" << threadId << " prints " << current << std::endl; current++;
cv.notify_all(); } }
int main() { std::thread t1(printNumber, 0); std::thread t2(printNumber, 1); std::thread t3(printNumber, 2);
t1.join(); t2.join(); t3.join();
return 0; }
|
condition_variable_any
概述:
跟condition_variable类似,但是比其更灵活。condition_variable只能和std::unique_lock<std::mutex>搭配,而condition_variable_any可以与任何满足Lockable概念的锁搭配使用,适用于自定义锁、std::shared_mutex等,但性能就差了一些
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include <iostream> #include <mutex> #include <condition_variable> #include <thread> #include <shared_mutex>
std::condition_variable_any cv_any; std::shared_mutex shared_mtx; int data = 0; bool ready = false;
void producer() { std::this_thread::sleep_for(std::chrono::seconds(1)); std::unique_lock<std::shared_mutex> lock(shared_mtx); data = 42; ready = true; std::cout << "Producer: Data ready\n"; cv_any.notify_one(); }
void consumer() { std::unique_lock<std::shared_mutex> lock(shared_mtx); cv_any.wait(lock, [] { return ready; }); std::cout << "Consumer Data: " << data << std::endl; }
int main() { std::thread t1(producer); std::thread t2(consumer);
t1.join(); t2.join();
return 0; }
|
信号量
概述:
是一个计数器,用于控制多个线程对于共享资源的访问。信号量包括二进制信号量和计数信号量。常见的信号量有std::binary_semaphore(C++20引入)和std::couting_semaphore<N>
binary_semaphore
概述:
类似于std::mutex,也具有非阻塞的try_acquire()(mutex是try_lock),但是可以手动释放(不像mutex只能由持有锁的线程释放)
主要特点:
- 值域仅为0或1:
- 0:表示资源不可用,线程阻塞等待
- 1:表示资源可用,允许线程访问
- 支持手动release():不像mutex由持有者释放,binary_semaphore可以由任何线程释放
- 支持try_acquire():非阻塞获取信号量
- 适用于控制访问:适用于单个资源的独占访问,比mutex更灵活
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <thread> #include <semaphore>
std::binary_semaphore sem(0);
void worker() { std::cout << "Worker: Waiting for signal...\n"; sem.acquire(); std::cout << "Worker: Acquired semaphore! Doing work...\n"; }
int main() { std::thread t(worker);
std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "Main: Releasing semaphore\n"; sem.release();
t.join(); return 0; }
|
counting_semaphore<N>
概述: 允许多个线程同时获取资源
主要特点:
- 允许最多N个线程同时访问
- acquire()减少信号量(如果为0,阻塞)
- release()增加信号量(最多到N)
- try_acquire()非阻塞获取
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <iostream> #include <thread> #include <semaphore>
std::counting_semaphore<3> sem(3);
void worker(int id) { sem.acquire(); std::cout << "Thread " << id << " is working...\n"; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "Thread " << id << " done.\n"; sem.release(); }
int main() { std::thread threads[5];
for (int i = 0; i < 5; ++i) { threads[i] = std::thread(worker, i); }
for (auto& t : threads) { t.join(); }
return 0; }
|
屏障
概述:
用于在多个线程中所有线程都达到某个点时进行同步,然后继续执行。常见的屏障有std::barrier(C++20引入)
主要特点:
- 需要设置固定数量的线程(称为count)
- 线程到达barrier后必须等待,直到所有线程都到达
- 线程全部到达,自动解除barrier,所有线程继续执行
- 支持阶段同步,可以在每轮barrier完成后执行回调函数
主要方法:
- barrier(std::ptrdiff_t count, CompletionFunction f =
{}):创建barrier,count是线程数,f是可选的阶段回调
- arrive_and_wait():线程到达barrier并等待,直到所有线程到达
- arrive_and_drop():线程达到后退出屏障,减少屏障计数
- arrive(n):线程到达n次(不等待)
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| #include <iostream> #include <thread> #include <barrier>
std::barrier syncPoint(3);
void worker(int id) { std::cout << "Thread " << id << " waiting at barrier...\n"; syncPoint.arrive_and_wait(); std::cout << "Thread " << id << " passed the barrier!\n"; }
int main() { std::thread t1(worker, 1); std::thread t2(worker, 2); std::thread t3(worker, 3);
t1.join(); t2.join(); t3.join();
return 0; }
|
带回调函数的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <iostream> #include <thread> #include <barrier>
void onPhaseComplete() { std::cout << "All threads reached the barrier. Proceeding to next phase...\n"; }
std::barrier syncPoint(3, onPhaseComplete);
void worker(int id) { std::cout << "Thread " << id << " waiting at barrier...\n"; syncPoint.arrive_and_wait(); std::cout << "Thread " << id << " passed the barrier!\n"; }
int main() { std::thread t1(worker, 1); std::thread t2(worker, 2); std::thread t3(worker, 3);
t1.join(); t2.join(); t3.join();
return 0; }
|
arrive_and_drop()让一个线程退出barrier:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| #include <iostream> #include <thread> #include <barrier>
std::barrier syncPoint(3);
void worker(int id) { if (id == 1) { std::cout << "Thread " << id << " leaving the barrier parmanently...\n"; syncPoint.arrive_and_drop(); return; }
std::cout << "Thread " << id << " waiting at barrier...\n"; syncPoint.arrive_and_wait(); std::cout << "Thread " << id << " passed the barrier!\n"; }
int main() { std::thread t1(worker, 1); std::thread t2(worker, 2); std::thread t3(worker, 3);
t1.join(); t2.join(); t3.join();
return 0; }
|
原子操作
概述:
是一种保证不会被中断的操作。常见的原子操作有std::atomic和C++11中引入的原子操作函数
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include <iostream> #include <atomic> #include <thread>
std::atomic<int> counter(0);
void increment() { for (int i = 0; i < 1000; ++i) { counter.fetch_add(1, std::memory_order_relaxed); } }
int main() { std::thread t1(increment); std::thread t2(increment);
t1.join(); t2.join();
std::cout << "Final counter: " << counter.load() << std::endl; return 0; }
|
支持的类型:
主要方法:
- load():获取原子变量的值
- store(x):设置原子变量的值
- fetch_add(x):原子加法
- fetch_sub(x):原子减法
- fetch_or(x):原子或
- fetch_and(x):原子与
- exchange(x):原子赋值并返回旧值
- compare_exchange_weak(x, y):CAS操作,比较并交换值
std::memory_order内存模型:
- memory_order_relaxed:只保证原子性,不保证顺序
- memory_order_consume:依赖load()的指令不会被重排序(很少使用)
- memory_order_acquire:读取前的所有读写不会被重排序
- memory_order_release:释放前的所有读写不会被重排序
- memory_order_acq_rel:组合acquire和release
- memory_order_seq_cst:默认,保证全局顺序一致
读写锁
概述:
允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。常见的读写锁有std::shared_mutex(C++17引入)
基础用法示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| x#include <iostream> #include <thread> #include <shared_mutex> #include <vector> #include <mutex>
std::shared_mutex rwMtx; int sharedData = 0;
void reader(int id) { std::shared_lock lock(rwMtx); std::cout << "Reader " << id << " read value: " << sharedData << std::endl; }
void writer(int id, int value) { std::unique_lock lock(rwMtx); sharedData = value; std::cout << "Writer " << id << " updated value to: " << sharedData << std::endl; }
int main() { std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) { threads.emplace_back(reader, i); }
threads.emplace_back(writer, 1, 100);
for (int i = 5; i < 10; ++i) { threads.emplace_back(reader, i); }
for (auto& t : threads) { t.join(); }
return 0; }
|