互斥锁 + 条件变量
信号量
哲学家吃饭
线程同步
同步 (Synchronization)
- 两个或两个以上随时间变化的量在变化过程中保持一定的相对关系
- iPhone/iCloud 同步 (手机 vs 电脑 vs 云端)
- 变速箱同步器 (合并快慢速齿轮)
- 同步电机 (转子与磁场速度一致)
- 同步电路 (所有触发器在边沿同时触发)
异步 (Asynchronous) = 不同步
- 上述很多例子都有异步版本 (异步电机、异步电路、异步线程)
并发程序中的同步:并发程序的步调很难保持 “完全一致”
- 线程同步:在某个时间点共同达到互相已知的状态
生产者-消费者问题
99% 的实际并发问题都可以用生产者-消费者解决。
一种题目描述 ()问题
在 printf 前后增加代码,使得打印的括号序列满足
- 一定是某个合法括号序列的前缀
- 括号嵌套的深度不超过 n
- n=3, ((())())((( 合法
- n=3, (((()))), (())) (右括号多了一个) 不合法
同步
- 等到有空位再打印左括号
- 等到能配对时再打印右括号
分析
- 共享的缓冲区:终端(包)
- 将’(‘ 放入”包”中。’)’是取出资源。
- 资源数量即 ‘(‘的数量
互斥锁实现
- 能否用互斥锁实现括号问题?
- 左括号:嵌套深度 (队列) 不足 n 时才能打印
- 右括号:嵌套深度 (队列) >1 时才能打印
- 当然是等到满足条件时再打印了:pc.c
- 用互斥锁保持条件成立
- 当然是等到满足条件时再打印了:pc.c
- wrong code :应当用while代替if ,防止消费者唤醒消费者,生产者唤醒生产者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int cnt = 0; // 资源数量
mutex_t lk = MUTEX_INIT(); // futex锁 即 封装的POSIX的pthread_mutex_t 在thread_sync中
int capacity = N; // 资源区大小
void Tproduce()
{
while(1)
{
retry:
// lock
mutex_lock(&lk); // futex 获得锁失败:进入kernel 调度到其他thread。获得锁成功:不进入kernel
// 资源区满了 不再加入资源
if(cnt == capacity)
{
// unlock
mutex_unlock(&lk);
goto retry;
}
// 放入资源
++cnt;
printf("(");
// 在printf的时候不需要上锁 因为printf本身就会获得终端的锁 不必担心会打印乱码
// unlock
mutex_unlock(&lk);
}
}
void Tconsume()
{
while(1)
{
retry:
mutex_lock(&lk);
if(cnt == 0)
{
mutex_unlock(&lk);
goto retry;
}
// 取出资源
--cnt;
printf(")");
// unlock
mutex_unlock(&lk);
}
}
// main
vector<thread> produces;
vector<thread> consumers;
for(int i=0;i<8;++i)
{
produces.emplace_back(Tproduce);
consumers.emplace_back(Tconsume);
}
互斥锁 + 条件变量实现
- 使用条件变量比起只用互斥量的好处:
- cond不满足时,用阻塞代替while空转。
信号量实现
条件变量
- 引入:while空转太浪费cpu了,不如阻塞,直到cond满足。
API
- wait(cv, mutex) 💤
- 调用时必须保证已经获得 mutex
- 释放 mutex、进入睡眠状态
- signal/notify(cv) 💬 私信:走起
- 如果有线程正在等待 cv,则唤醒其中一个线程
- broadcast/notifyAll(cv) 📣 所有人:走起
- 唤醒全部正在等待 cv 的线程
wrong code
c
1
2
3
4
5
6
7
8
9
10
11
12
13void Tproduce() {
mutex_lock(&lk);
if (count == n) cond_wait(&cv, &lk);
printf("("); count++; cond_signal(&cv);
mutex_unlock(&lk);
}
void Tconsume() {
mutex_lock(&lk);
if (count == 0) cond_wait(&cv, &lk);
printf(")"); count--; cond_signal(&cv);
mutex_unlock(&lk);
}当producer 或者 consumer的thread数量大于1时,就会发生错误。
- 因为生产者可能会唤醒生产者 / 消费者可能会唤醒消费者
- ```c
(())(()()()())((()))()(())(()(())))))()())))))))))))))))))))))))))))))))))))))))))))))))))))))))))))shc@DESKTOP-TVUERHD:~/Code/try/lock$shc@DESKTOP-TVUERHD:~/Code/try/lock$ ./pc2 2 | head -c 100
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16- 解决:if改成while
### 万能的正确使用姿势
- **需要等待条件满足时**
```c
mutex_lock(&mutex);
while (!cond) {
wait(&cv, &mutex);
}
// 无论thread如何并发 while都保证了出循环时一定满足cond
assert(cond);
// ...
// 互斥锁保证了在此期间条件 cond 总是成立
// ...
mutex_unlock(&mutex);
- 因为生产者可能会唤醒生产者 / 消费者可能会唤醒消费者
其他线程条件可能被满足时
1
broadcast(&cv);
应用
- 观点:为了防止生产者唤醒生产者 ; 消费者唤醒消费者. 可以使用两个条件变量 ?
- 一个是notFullCond 一个是notEmptyCond
- 生产者等待notFullCond , 唤醒notEmptyCond
- 消费者等待NotEmptyCond , 唤醒notFullCond
- 不过这个也算不了什么优化吧 ? 生产者唤醒生产者又能怎样。顶多是多生产一些资源燃火再让消费者消费呗
- 一个是notFullCond 一个是notEmptyCond
生产者-消费者
- ()问题正确源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int cnt = 0; // 资源数量
mutex_t lk = MUTEX_INIT(); // futex锁
cond_t cv = COND_INIT(); // 条件变量
int capacity = N; // 资源区大小
void Tproduce()
{
while(1)
{
// lock
mutex_lock(&lk);
while( ! (cnt != capacity) ) // !cond
{
// unblock and sleep
cond_wait(&cv,&lk);
}
// 出循环时一定满足cond
assert(cnt!=capacity);
// 放入资源
++cnt;
printf("(");
// unlock
mutex_unlock(&lk);
// 全部wakeup。防止 一些情况下生产者唤醒生产者 或者 consunmer唤醒consumer产生的bug。
cond_broadcast(&cv);
}
}
void Tconsume()
{
while(1)
{
mutex_lock(&lk);
while(!(cnt!=0))
{
cond_wait(&cv,&lk);
}
assert(cnt != 0);
--cnt;
printf(")");
mutex_unlock(&lk);
cond_broadcast(&cv);
}
}
实现并行计算
- 利用条件变量的万能姿势实现并行计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18struct job {
void (*run)(void *arg);
void *arg;
}
while (1) {
struct job *job;
mutex_lock(&mutex);
while (! (job = get_job()) ) { // cond :job不为空
wait(&cv, &mutex);
}
mutex_unlock(&mutex);
job->run(job->arg); // 不需要持有锁
// 可以生成新的 job
// 注意回收分配的资源
}
<>_ (例题)
所以有了这个万能算法就很简单了阿,可以简单的解决奇怪的并发问题
- 用条件变量解决问题只要回答一个问题:while循环的条件是什么,也即,我们的cond是什么?
- 还需要画出状态机,在什么情况下可以做出什么动作 然后转移到什么状态。
例题如下 使用锁 + 条件变量 + 状态机。
题目:打印出<>_ 或者 ><>_
状态机
fish.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
enum { A = 1, B, C, D, E, F, };
struct rule {
int from, ch, to;
};
struct rule rules[] = {
{ A, '<', B },
{ B, '>', C },
{ C, '<', D },
{ A, '>', E },
{ E, '<', F },
{ F, '>', D },
{ D, '_', A },
};
// state quota是为了保证打印完之后可以原子的改变状态 再切换到其他thread上、可以优化掉。
int current = A, quota = 1;
pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 根据current 和 ch 获取下一state
int next(char ch) {
for (int i = 0; i < LENGTH(rules); i++) {
struct rule *rule = &rules[i];
if (rule->from == current && rule->ch == ch) {
return rule->to;
}
}
return 0;
}
void fish_before(char ch) {
pthread_mutex_lock(&lk);
while (!(next(ch) && quota)) { // next(ch) 是保证下一状态是状态机中存在的状态 非法的话就返回0,但不一定符合逻辑。所以还应当加一个计数的cnt,防止打印 < 的thread 执行完before之后切换到另一个打印<的thread。加上的cnt 保证了执行完after之后 才会切换到其他thread
// can proceed only if (next(ch) && quota)
pthread_cond_wait(&cond, &lk);
}
quota--; // 这是为了保证 state 改变之后 再切换到下一个 thread 。如果没有这个保证的话,可能putchar打印完<之后,CPU可能切换到另一个thread(比如另一个打印>的thread的before函数)但是,我们打印完之后并没有改变current(state),因此就会发生逻辑错误导致死锁。 其实可以优化掉
pthread_mutex_unlock(&lk);
}
void fish_after(char ch) {
pthread_mutex_lock(&lk);
quota++;
current = next(ch); // state转化
assert(current); // 确保state合法性。否则逻辑错误 终止程序
pthread_cond_broadcast(&cond); // 唤醒所有等待在cond的thread
pthread_mutex_unlock(&lk);
}
const char roles[] = ".<<<<<>>>>___";
void fish_thread(int id) {
char role = roles[id];
while (1) {
fish_before(role); // 根据state和本thread要打印的ch,阻塞到 一定时刻 再进行打印。
putchar(role); // can be long; no lock protection // 在putchar的时候不需要上锁 因为putchar本身就会获得终端的锁 不必担心多个thread同时打印时 会打印乱码
// printf("%c current = %d\n",ch,current);
fish_after(role); // 唤醒
}
}
int main() {
setbuf(stdout, NULL);
for (int i = 0; i < strlen(roles); i++)
create(fish_thread);
}
shc@DESKTOP-TVUERHD:~/Code/try/lock/jyy$ gcc fish.c -o fish -Wall -lpthread && ./fish
<><_<><_<><_><>_<><_><>_<><_<><_><>_<><_><>_<><_><>_><>_><>_<><_><>_<><_><>_><>_><>_<><_<><_<><_><>_...
- 简化:myFish.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
enum { A = 1, B, C, D, E, F, };
struct rule {
int from, ch, to;
};
struct rule rules[] = {
{ A, '<', B },
{ B, '>', C },
{ C, '<', D },
{ A, '>', E },
{ E, '<', F },
{ F, '>', D },
{ D, '_', A },
};
int current = A;
pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int next(char ch) {
for (int i = 0; i < LENGTH(rules); i++) {
struct rule *rule = &rules[i];
if (rule->from == current && rule->ch == ch) {
return rule->to;
}
}
return 0;
}
void fish(char ch) {
pthread_mutex_lock(&lk);
// while(!cond){}
while (!next(ch)) { // next(ch) 是保证下一状态是状态机中存在的状态 仅仅保证非0,而不一定符合逻辑。所以还应当加一个计数的cnt,防止打印 < 的thread 执行完before之后切换到另一个打印<的thread。加上的cnt 保证了执行完after之后 才会切换到其他thread
pthread_cond_wait(&cond, &lk); // wait
}
putchar(ch);
current = next(ch); // state前进
assert(current); // check 确保state合法性。否则逻辑错误 终止程序
pthread_cond_broadcast(&cond); // wakeup
pthread_mutex_unlock(&lk);
}
const char roles[] = ".<<<<<>>>>___";
// const char roles[] = ".<>_";
void fish_thread(int id) {
char role = roles[id];
while (1) {
fish(role); // 根据state和本thread要打印的ch,阻塞到什么时候再进行打印。
}
}
int main() {
setbuf(stdout, NULL);
for (int i = 0; i < strlen(roles); i++)
create(fish_thread);
}
shc@DESKTOP-TVUERHD:~/Code/try/lock/jyy$ gcc myFish.c -o myFish -Wall -lpthread && ./myFish | head -c 10000
<><_<><_><>_<><_><>_<><_><>_<><_<><_<><_><>_><>_<><_<><_><>_<><_<><_><>_><>_<><_<><_<><_<><_><>_><>_><>_<><_<><_<><_<><_<><_<><_...
H2O
- 同上。提交记录中有cpp。照着jyy教的写就行了。
- lc H2O
- 用了两种。一种比较简化。不过都是锁 + 条件变量 + 状态机
信号量 - 带计数器的锁(扩展的锁)
生产者-消费者
- 信号量设计的重点
- 考虑 “手环” (每一单位的 “资源”) 是什么,谁创造?谁获取?
- code : 比使用条件变量更加简洁
- 使用两个信号量
- 在 “一单位资源” 明确的问题上更好用
- 还是推荐用条件变量。信号量需要更加小心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20void producer() {
P(&empty); // P()返回 -> 得到手环
printf("("); // 假设线程安全
V(&fill);
}
void consumer() {
P(&fill);
printf(")");
V(&empty);
}
int main(int argc, char *argv[]) {
assert(argc == 2);
SEM_INIT(&fill, 0);
SEM_INIT(&empty, atoi(argv[1]));
for (int i = 0; i < 8; i++) {
create(producer);
create(consumer);
}
}
哲学家吃饭
- 哲学家 (线程) 有时思考,有时吃饭
- 吃饭需要同时得到左手和右手的叉子
- 当叉子被其他人占有时,必须等待,如何完成同步?
- 如何用互斥锁/信号量实现?
- 套用生产者消费者模型
信号量
- 写不好的话可能会出现 同时所有哲学家都拿起左手边的叉子,而陷入dead lock
- fail code
- 有可能一个thread P1之后 又切到另一个thread P2 … 最后每个thread只有一个叉子。循环等待。故死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14void Tphilosopher(int id) {
int lhs = (N + id - 1) % N;
int rhs = id % N;
while (1) {
P(&locks[lhs]);
printf("T%d Got %d\n", id, lhs + 1);
P(&locks[rhs]);
printf("T%d Got %d\n", id, rhs + 1);
V(&locks[lhs]);
V(&locks[rhs]);
}
}
top
1613 shc 20 0 92768 556 472 S 0.0 0.0 0:00.00 philosopher
- 有可能一个thread P1之后 又切到另一个thread P2 … 最后每个thread只有一个叉子。循环等待。故死锁。
万能的条件变量
- 资源是叉子
- 临界区是叉子。
- 各个线程互斥的处理叉子资源(即在处理资源时各个thread是串行的)
- 拿到资源后的thread 在吃饭的时候是并行的。
- code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// 资源:叉子 avail
// 通过互斥锁的原子性保证了处理资源时的互斥性。
// 同一时刻拿到锁的thread只有一个
// 同一时刻load、store资源的thread只有一个
// 也即保证了load和store的原子性
mutex_lock(&mutex); // atomic
while (!(avail[lhs] && avail[rhs])) {
wait(&cv, &mutex);
}
// 同一时刻 只有一个thread 会在这里
// 取资源
avail[lhs] = avail[rhs] = false;
mutex_unlock(&mutex);
// eat
mutex_lock(&mutex);
// 同一时刻 只有一个thread 会在这里
// 放回资源
avail[lhs] = avail[rhs] = true;
broadcast(&cv); // 唤醒别人,条件满足
mutex_unlock(&mutex);
Leader/Follower - 生产者/消费者
- 分布式常见的解决思路(HDFS)
- Leader : 服务员
- Follower : 哲学家
- 让一个人(服务员)集中的管理资源(叉子)
- 这个人可以知道资源(叉子)的情况,比如谁霸占了叉子,谁用叉子用了多长时间
- 而非使用信号量(分布的,让每个人自己去争抢)
- Twaiter可能成为瓶颈:一大桌人吃饭,每个人都叫服务员的感觉
- 抛开 workload 谈优化就是耍流氓
- 吃饭的时间通常远远大于请求服务员的时间
- 如果一个 manager 搞不定,可以分多个 (fast/slow path)
- 把系统设计好,使集中管理不成为瓶颈
- code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// follower
void Tphilosopher(int id) {
send_request(id, EAT); // 请求资源
P(allowed[id]); // waiter 会把叉子递给哲学家
philosopher_eat();
send_request(id, DONE); // 回收资源
}
// leader 处理follower发来的请求 -分配资源/回收资源
void Twaiter() {
while (1) {
(id, status) = receive_request();
if (status == EAT) { ... }
if (status == DONE) { ... }
}
}