不落辰

知不可乎骤得,托遗响于悲风

0%

操作系统-并发03-同步控制

互斥锁 + 条件变量
信号量
哲学家吃饭

线程同步

  • 同步 (Synchronization)

    • 两个或两个以上随时间变化的量在变化过程中保持一定的相对关系
    • iPhone/iCloud 同步 (手机 vs 电脑 vs 云端)
    • 变速箱同步器 (合并快慢速齿轮)
    • 同步电机 (转子与磁场速度一致)
    • 同步电路 (所有触发器在边沿同时触发)
  • 异步 (Asynchronous) = 不同步

    • 上述很多例子都有异步版本 (异步电机、异步电路、异步线程)
  • 并发程序中的同步:并发程序的步调很难保持 “完全一致”

    • 线程同步:在某个时间点共同达到互相已知的状态

生产者-消费者问题

99% 的实际并发问题都可以用生产者-消费者解决。

一种题目描述 ()问题

  • 在 printf 前后增加代码,使得打印的括号序列满足

    • 一定是某个合法括号序列的前缀
    • 括号嵌套的深度不超过 n
    • n=3, ((())())((( 合法
    • n=3, (((()))), (())) (右括号多了一个) 不合法
  • 同步

    • 等到有空位再打印左括号
    • 等到能配对时再打印右括号
  • 分析

    • 共享的缓冲区:终端(包)
    • 将’(‘ 放入”包”中。’)’是取出资源。
    • 资源数量即 ‘(‘的数量

互斥锁实现

  • 能否用互斥锁实现括号问题
    • 左括号:嵌套深度 (队列) 不足 n 时才能打印
    • 右括号:嵌套深度 (队列) >1 时才能打印
      • 当然是等到满足条件时再打印了:pc.c
        • 用互斥锁保持条件成立
  • wrong code :应当用while代替if ,防止消费者唤醒消费者,生产者唤醒生产者。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    #include<thread>                //  仅仅是为了用c++11的thread类 更方便
    #include<pthread.h>
    #include "thread-sync.h" // 实现了spinlock , 封装了 POSIX的mutex(futex),封装了条件变量还

    int cnt = 0; // 资源数量
    mutex_t lk = MUTEX_INIT(); // futex锁 即 封装的POSIX的pthread_mutex_t 在thread_sync中
    int capacity = N; // 资源区大小
    void Tproduce()
    {
    while(1)
    {
    retry:
    // lock
    mutex_lock(&lk); // futex 获得锁失败:进入kernel 调度到其他thread。获得锁成功:不进入kernel
    // 资源区满了 不再加入资源
    if(cnt == capacity)
    {
    // unlock
    mutex_unlock(&lk);
    goto retry;
    }
    // 放入资源
    ++cnt;
    printf("(");
    // 在printf的时候不需要上锁 因为printf本身就会获得终端的锁 不必担心会打印乱码
    // unlock
    mutex_unlock(&lk);
    }
    }

    void Tconsume()
    {
    while(1)
    {
    retry:
    mutex_lock(&lk);
    if(cnt == 0)
    {
    mutex_unlock(&lk);
    goto retry;
    }
    // 取出资源
    --cnt;
    printf(")");
    // unlock
    mutex_unlock(&lk);
    }
    }

    // main
    vector<thread> produces;
    vector<thread> consumers;
    for(int i=0;i<8;++i)
    {
    produces.emplace_back(Tproduce);
    consumers.emplace_back(Tconsume);
    }

互斥锁 + 条件变量实现

  • 使用条件变量比起只用互斥量的好处
    • cond不满足时,用阻塞代替while空转

信号量实现

条件变量

  • 引入:while空转太浪费cpu了,不如阻塞,直到cond满足。

API

  • wait(cv, mutex) 💤
    • 调用时必须保证已经获得 mutex
    • 释放 mutex、进入睡眠状态
  • signal/notify(cv) 💬 私信:走起
    • 如果有线程正在等待 cv,则唤醒其中一个线程
  • broadcast/notifyAll(cv) 📣 所有人:走起
    • 唤醒全部正在等待 cv 的线程

wrong code

  • c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void Tproduce() {
    mutex_lock(&lk);
    if (count == n) cond_wait(&cv, &lk);
    printf("("); count++; cond_signal(&cv);
    mutex_unlock(&lk);
    }

    void Tconsume() {
    mutex_lock(&lk);
    if (count == 0) cond_wait(&cv, &lk);
    printf(")"); count--; cond_signal(&cv);
    mutex_unlock(&lk);
    }
  • 当producer 或者 consumer的thread数量大于1时,就会发生错误。

    • 因为生产者可能会唤醒生产者 / 消费者可能会唤醒消费者
    • ```c
      shc@DESKTOP-TVUERHD:~/Code/try/lock$ ./pc2 2 | head -c 100
      
      (())(()()()())((()))()(())(()(())))))()())))))))))))))))))))))))))))))))))))))))))))))))))))))))))))shc@DESKTOP-TVUERHD:~/Code/try/lock$
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      - 解决:if改成while

      ### 万能的正确使用姿势

      - **需要等待条件满足时**
      ```c
      mutex_lock(&mutex);
      while (!cond) {
      wait(&cv, &mutex);
      }
      // 无论thread如何并发 while都保证了出循环时一定满足cond
      assert(cond);
      // ...
      // 互斥锁保证了在此期间条件 cond 总是成立
      // ...
      mutex_unlock(&mutex);
  • 其他线程条件可能被满足时

    1
    broadcast(&cv);

应用

  • 观点:为了防止生产者唤醒生产者 ; 消费者唤醒消费者. 可以使用两个条件变量 ?
    • 一个是notFullCond 一个是notEmptyCond
      • 生产者等待notFullCond , 唤醒notEmptyCond
      • 消费者等待NotEmptyCond , 唤醒notFullCond
    • 不过这个也算不了什么优化吧 ? 生产者唤醒生产者又能怎样。顶多是多生产一些资源燃火再让消费者消费呗

生产者-消费者

  • ()问题正确源代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    #include "thread-sync.h"        //  实现了spinlock , 封装了 POSIX的mutex(futex),封装了条件变量还

    int cnt = 0; // 资源数量
    mutex_t lk = MUTEX_INIT(); // futex锁
    cond_t cv = COND_INIT(); // 条件变量

    int capacity = N; // 资源区大小


    void Tproduce()
    {
    while(1)
    {
    // lock
    mutex_lock(&lk);
    while( ! (cnt != capacity) ) // !cond
    {
    // unblock and sleep
    cond_wait(&cv,&lk);
    }
    // 出循环时一定满足cond
    assert(cnt!=capacity);

    // 放入资源
    ++cnt;
    printf("(");

    // unlock
    mutex_unlock(&lk);

    // 全部wakeup。防止 一些情况下生产者唤醒生产者 或者 consunmer唤醒consumer产生的bug。
    cond_broadcast(&cv);
    }
    }

    void Tconsume()
    {
    while(1)
    {
    mutex_lock(&lk);

    while(!(cnt!=0))
    {
    cond_wait(&cv,&lk);
    }
    assert(cnt != 0);

    --cnt;
    printf(")");

    mutex_unlock(&lk);

    cond_broadcast(&cv);
    }
    }

实现并行计算

  • 利用条件变量的万能姿势实现并行计算
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    struct job {
    void (*run)(void *arg);
    void *arg;
    }

    while (1) {
    struct job *job;

    mutex_lock(&mutex);
    while (! (job = get_job()) ) { // cond :job不为空
    wait(&cv, &mutex);
    }
    mutex_unlock(&mutex);

    job->run(job->arg); // 不需要持有锁
    // 可以生成新的 job
    // 注意回收分配的资源
    }

<>_ (例题)

  • 所以有了这个万能算法就很简单了阿,可以简单的解决奇怪的并发问题

    • 用条件变量解决问题只要回答一个问题:while循环的条件是什么,也即,我们的cond是什么
    • 还需要画出状态机,在什么情况下可以做出什么动作 然后转移到什么状态。
  • 例题如下 使用锁 + 条件变量 + 状态机

  • 题目:打印出<>_ 或者 ><>_

  • 状态机

  • fish.c

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    #include "thread.h"

    #define LENGTH(arr) (sizeof(arr) / sizeof(arr[0]))

    enum { A = 1, B, C, D, E, F, };

    struct rule {
    int from, ch, to;
    };

    struct rule rules[] = {
    { A, '<', B },
    { B, '>', C },
    { C, '<', D },
    { A, '>', E },
    { E, '<', F },
    { F, '>', D },
    { D, '_', A },
    };

    // state quota是为了保证打印完之后可以原子的改变状态 再切换到其他thread上、可以优化掉。
    int current = A, quota = 1;

    pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    // 根据current 和 ch 获取下一state
    int next(char ch) {
    for (int i = 0; i < LENGTH(rules); i++) {
    struct rule *rule = &rules[i];
    if (rule->from == current && rule->ch == ch) {
    return rule->to;
    }
    }
    return 0;
    }

    void fish_before(char ch) {
    pthread_mutex_lock(&lk);
    while (!(next(ch) && quota)) { // next(ch) 是保证下一状态是状态机中存在的状态 非法的话就返回0,但不一定符合逻辑。所以还应当加一个计数的cnt,防止打印 < 的thread 执行完before之后切换到另一个打印<的thread。加上的cnt 保证了执行完after之后 才会切换到其他thread
    // can proceed only if (next(ch) && quota)
    pthread_cond_wait(&cond, &lk);
    }
    quota--; // 这是为了保证 state 改变之后 再切换到下一个 thread 。如果没有这个保证的话,可能putchar打印完<之后,CPU可能切换到另一个thread(比如另一个打印>的thread的before函数)但是,我们打印完之后并没有改变current(state),因此就会发生逻辑错误导致死锁。 其实可以优化掉
    pthread_mutex_unlock(&lk);
    }

    void fish_after(char ch) {
    pthread_mutex_lock(&lk);
    quota++;
    current = next(ch); // state转化
    assert(current); // 确保state合法性。否则逻辑错误 终止程序
    pthread_cond_broadcast(&cond); // 唤醒所有等待在cond的thread
    pthread_mutex_unlock(&lk);
    }

    const char roles[] = ".<<<<<>>>>___";

    void fish_thread(int id) {
    char role = roles[id];
    while (1) {
    fish_before(role); // 根据state和本thread要打印的ch,阻塞到 一定时刻 再进行打印。
    putchar(role); // can be long; no lock protection // 在putchar的时候不需要上锁 因为putchar本身就会获得终端的锁 不必担心多个thread同时打印时 会打印乱码
    // printf("%c current = %d\n",ch,current);
    fish_after(role); // 唤醒
    }
    }

    int main() {
    setbuf(stdout, NULL);
    for (int i = 0; i < strlen(roles); i++)
    create(fish_thread);
    }
    shc@DESKTOP-TVUERHD:~/Code/try/lock/jyy$ gcc fish.c -o fish -Wall -lpthread && ./fish
    <><_<><_<><_><>_<><_><>_<><_<><_><>_<><_><>_<><_><>_><>_><>_<><_><>_<><_><>_><>_><>_<><_<><_<><_><>_...
  • 简化:myFish.c
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    #include "thread.h"

    #include<sys/types.h>
    #include<sys/syscall.h>

    #define LENGTH(arr) (sizeof(arr) / sizeof(arr[0]))

    enum { A = 1, B, C, D, E, F, };

    struct rule {
    int from, ch, to;
    };

    struct rule rules[] = {
    { A, '<', B },
    { B, '>', C },
    { C, '<', D },
    { A, '>', E },
    { E, '<', F },
    { F, '>', D },
    { D, '_', A },
    };
    int current = A;

    pthread_mutex_t lk = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    int next(char ch) {
    for (int i = 0; i < LENGTH(rules); i++) {
    struct rule *rule = &rules[i];
    if (rule->from == current && rule->ch == ch) {
    return rule->to;
    }
    }
    return 0;
    }


    void fish(char ch) {
    pthread_mutex_lock(&lk);

    // while(!cond){}
    while (!next(ch)) { // next(ch) 是保证下一状态是状态机中存在的状态 仅仅保证非0,而不一定符合逻辑。所以还应当加一个计数的cnt,防止打印 < 的thread 执行完before之后切换到另一个打印<的thread。加上的cnt 保证了执行完after之后 才会切换到其他thread
    pthread_cond_wait(&cond, &lk); // wait
    }

    putchar(ch);
    current = next(ch); // state前进
    assert(current); // check 确保state合法性。否则逻辑错误 终止程序

    pthread_cond_broadcast(&cond); // wakeup
    pthread_mutex_unlock(&lk);
    }


    const char roles[] = ".<<<<<>>>>___";
    // const char roles[] = ".<>_";

    void fish_thread(int id) {
    char role = roles[id];
    while (1) {
    fish(role); // 根据state和本thread要打印的ch,阻塞到什么时候再进行打印。
    }
    }

    int main() {
    setbuf(stdout, NULL);
    for (int i = 0; i < strlen(roles); i++)
    create(fish_thread);
    }


    shc@DESKTOP-TVUERHD:~/Code/try/lock/jyy$ gcc myFish.c -o myFish -Wall -lpthread && ./myFish | head -c 10000
    <><_<><_><>_<><_><>_<><_><>_<><_<><_<><_><>_><>_<><_<><_><>_<><_<><_><>_><>_<><_<><_<><_<><_><>_><>_><>_<><_<><_<><_<><_<><_<><_...

H2O

  • 同上。提交记录中有cpp。照着jyy教的写就行了。
  • lc H2O
    • 用了两种。一种比较简化。不过都是锁 + 条件变量 + 状态机

信号量 - 带计数器的锁(扩展的锁)

  • 封装
    1
    2
    3
    4
    // Semaphore
    #define P sem_wait
    #define V sem_post
    #define SEM_INIT(sem, val) sem_init(sem, 0, val)

    锁与信号量



生产者-消费者

  • 信号量设计的重点
    • 考虑 “手环” (每一单位的 “资源”) 是什么,谁创造?谁获取
  • code : 比使用条件变量更加简洁
    • 使用两个信号量
    • 在 “一单位资源” 明确的问题上更好用
    • 还是推荐用条件变量。信号量需要更加小心
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      void producer() {
      P(&empty); // P()返回 -> 得到手环
      printf("("); // 假设线程安全
      V(&fill);
      }
      void consumer() {
      P(&fill);
      printf(")");
      V(&empty);
      }

      int main(int argc, char *argv[]) {
      assert(argc == 2);
      SEM_INIT(&fill, 0);
      SEM_INIT(&empty, atoi(argv[1]));
      for (int i = 0; i < 8; i++) {
      create(producer);
      create(consumer);
      }
      }

哲学家吃饭

  • 哲学家 (线程) 有时思考,有时吃饭
    • 吃饭需要同时得到左手和右手的叉子
    • 当叉子被其他人占有时,必须等待,如何完成同步?
      • 如何用互斥锁/信号量实现?
      • 套用生产者消费者模型

信号量

  • 写不好的话可能会出现 同时所有哲学家都拿起左手边的叉子,而陷入dead lock
  • fail code
    • 有可能一个thread P1之后 又切到另一个thread P2 … 最后每个thread只有一个叉子。循环等待。故死锁。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      void Tphilosopher(int id) {
      int lhs = (N + id - 1) % N;
      int rhs = id % N;
      while (1) {
      P(&locks[lhs]);
      printf("T%d Got %d\n", id, lhs + 1);
      P(&locks[rhs]);
      printf("T%d Got %d\n", id, rhs + 1);
      V(&locks[lhs]);
      V(&locks[rhs]);
      }
      }
      top
      1613 shc 20 0 92768 556 472 S 0.0 0.0 0:00.00 philosopher

万能的条件变量

  • 资源是叉子
  • 临界区是叉子。
  • 各个线程互斥的处理叉子资源(即在处理资源时各个thread是串行的)
  • 拿到资源后的thread 在吃饭的时候是并行的。
  • code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    //  资源:叉子 avail
    // 通过互斥锁的原子性保证了处理资源时的互斥性。
    // 同一时刻拿到锁的thread只有一个
    // 同一时刻load、store资源的thread只有一个
    // 也即保证了load和store的原子性
    mutex_lock(&mutex); // atomic
    while (!(avail[lhs] && avail[rhs])) {
    wait(&cv, &mutex);
    }

    // 同一时刻 只有一个thread 会在这里
    // 取资源
    avail[lhs] = avail[rhs] = false;

    mutex_unlock(&mutex);

    // eat

    mutex_lock(&mutex);
    // 同一时刻 只有一个thread 会在这里
    // 放回资源
    avail[lhs] = avail[rhs] = true;

    broadcast(&cv); // 唤醒别人,条件满足
    mutex_unlock(&mutex);

Leader/Follower - 生产者/消费者

  • 分布式常见的解决思路(HDFS)
  • Leader : 服务员
  • Follower : 哲学家
  • 让一个人(服务员)集中的管理资源(叉子)
    • 这个人可以知道资源(叉子)的情况,比如谁霸占了叉子,谁用叉子用了多长时间
    • 而非使用信号量(分布的,让每个人自己去争抢)
  • Twaiter可能成为瓶颈:一大桌人吃饭,每个人都叫服务员的感觉
  • 抛开 workload 谈优化就是耍流氓
    • 吃饭的时间通常远远大于请求服务员的时间
    • 如果一个 manager 搞不定,可以分多个 (fast/slow path)
      • 把系统设计好,使集中管理不成为瓶颈
  • code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    //  follower
    void Tphilosopher(int id) {
    send_request(id, EAT); // 请求资源
    P(allowed[id]); // waiter 会把叉子递给哲学家
    philosopher_eat();
    send_request(id, DONE); // 回收资源
    }

    // leader 处理follower发来的请求 -分配资源/回收资源
    void Twaiter() {
    while (1) {
    (id, status) = receive_request();
    if (status == EAT) { ... }
    if (status == DONE) { ... }
    }
    }