📂 操作系统 - Operating System / Chapter III 并发 / Section 1 线程 / 1-2 条件变量与信号量

条件变量

2026-05-28
#OS
  • 线程与条件

    • 很多时候,线程需要满足一定条件才可以继续运行:
      • 父线程需要等待子线程执行完毕才可以返回
      • 自旋等待目的满足唤醒线程
    • 线程需要使用条件变量来等待一个条件满足
  • 定义和程序

    • 条件变量(condition variable) 是一个显式队列
    • 当某些执行条件(condition) 不满足时,线程可以把自己加入队列,并等待(waiting) 该条件
    • 另外一个线程改变这个条件时,就可以唤醒一个或多个等待线程
    • 条件变量提供了两个接口:wait()signal()
      • wait()

        • 当线程需要睡眠的时候,就调用wait()
        • wait()需要传入一个锁作为参数,且这个锁已经被锁住了
        • wait()的职责就是释放锁,同时让调用wait()的线程休眠,这是一整个原子操作
        • 当有其他线程调用signal()唤醒当前线程,当前线程才会继续运行,同时会尝试重新持有该锁
      • signal()

        • 当线程想唤醒等待在某个条件变量上的睡眠线程时,就调用signal()
        • 若一个线程在等待某个条件变量,那么当受到signal()唤醒后,它会尝试重新持有相关的锁
  • 生产者/消费者(有界缓冲区)问题

    • 生产者/消费者(producer/consumer)问题,也称为有界缓冲区(bounded-buffer)问题是指:
      • 系统中有一个或多个生产者线程,以及一个或多个消费者线程
      • 生产者会将生成的数据项放入缓冲区
      • 消费者会从缓冲区中取走数据项,并以某种方式消费
    • 有界缓冲区是一片共享资源,所以必须通过同步机制来访问它,以免出现竞态条件
    • 单值缓冲区存入接口put()与取出接口get()

      • 假设缓冲区只有一个单元,缓冲区满时消费者就可以消费资源
      int buffer; // 只有一个单元的缓冲区
      int count = 0; // 缓冲区中的产品数量
      
      /*
          put()函数 提供生产者放入产品的接口
          参数:   value int类型,生产者放入缓冲区的产品
          返回值: 无
      */
      void put(int value) {
          assert(count == 0); // 检查缓冲区是不是已有产品,如果已有产品,程序就会终止
          count = 1; // 更新产品数量
          buffer = value; // 放入产品
      }
      
      /*  
          get()函数 提供消费者取出产品的接口,并返回取出的产品
          参数:   无
          返回值: 消费者取出的产品(int类型)
      */
      int get() {
          assert(count == 1); // 检查缓冲区有没有产品,如果没有产品,程序就会终止
          count = 0; // 更新产品数量
          return buffer; // 取出产品
      }
      
    • 最简单的生产者线程与消费者线程函数

      /*
          producer()函数 生产者线程的行为
          参数:   loops int类型,生产者需要生产的产品数量
          返回值: 无
      */
      void* producer(int loops) {
          for (int i = 0; i < loops; ++i) {   // 循环生产指定数量的产品
              put(i); // 生产者放入产品,产品的值为i
          }
      }
      
      /*
          consumer()函数 消费者线程的行为
          参数:   loops int类型,消费者需要消费的产品数量
          返回值: 无
      */
      void* consumer(int loops) {
          for (int i = 0; i < loops; ++i) {   // 循环取出指定数量的产品
              int value = get(); // 消费者取出产品,将产品存入value变量
              std::cout << "Consumed: " << value << std::endl; // 显示消费的产品
          }
      }
      
    • 有问题的方案

      • 因为put()和get()中存在临界区,所以我们需要锁来隔离两个线程,同时我们还需要条件变量来在恰当的时候唤醒线程
      • 首次尝试
        cond_t cond; // 定义条件变量
        mutex_t mutex; // 定义互斥锁(锁变量)
        
        /*
            producer()函数 生产者线程的行为
            参数:   loops int类型,生产者需要生产的产品数量
            返回值: 无
        */
        void* producer(int loops) {
            for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                if (count == 1) { // 如果缓冲区有产品,生产者就需要等待
                    Pthread_cond_wait(&cond, &mutex);// 等待条件变量cond,等待时会自动释放mutex锁
                }
                // 等待条件变量完毕,生产者被唤醒,可以放入产品了,此时mutex锁已经被重新获得
                put(i); // 生产者放入产品
                pthread_cond_signal(&cond); // 生产者发出信号
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
        /*
            consumer()函数 消费者线程的行为
            参数:   loops int类型,消费者需要消费的产品数量
            返回值: 无
        */
        void* consumer(int loops) {
            for (int i = 0; i < loops; ++i) {   // 循环取出指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                if (count == 0) { // 如果缓冲区没有产品,消费者就需要等待
                    Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁
                }
                // 等待条件变量完毕,消费者被唤醒,可以取出产品了,此时mutex锁已经被重新获得
                int value = get(); // 消费者取出产品
                std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品
                pthread_cond_signal(&cond); // 消费者发出信号
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
      • 这段代码在只有一个生产者线程和一个消费者线程时可以正常运作,但是如果消费者变得更多,它将会出现问题: 假设现在有1个生产者线程p1,和2个消费者线程c1、c2
        1. 首先消费者c1运行,它会获得锁
        2. 然后检查缓冲区是否已满(count==1则为已满)
        3. c1发现缓冲区内没有资源,于是释放锁,并开始等待,直到条件变量将其唤醒
        4. 生产者p1运行,它会获得锁
        5. 然后检查缓冲区是否已满(count==1则为已满)
        6. p1发现count==0,说明缓冲区没有满,于是在缓冲区中放入一个资源,此时buffer被设为该资源的值count被设为1
        7. 然后p1在条件变量cond上发送信号“状态改变了”,于是,现在等待在条件变量cond上的c1被唤醒,于是c1准备开始运行
        8. p1继续运行,直到在for循环体中再一次检查count,发现count为1,于是睡眠
        9. 但是,此时操作系统调度让c2先开始运行。c2一看,发现缓冲区满了,于是把资源取走了
        10. 然后c1开始运行,但是此时缓冲区的资源已经被消费了,而c1不知道这事,所以c1只能错误地取走一个已经被消费的、无效的值
      • 在一个线程通过条件变量发送信号时,该线程只是唤醒了某一个线程,并不能保证该线程立刻开始执行,也就不能保证被唤醒的线程开始运行时一定是它期望的情况
        • 信号的这种释义被称为Mesa语义
        • 与之相对的有Hoare语义,它会保证线程被唤醒时立刻开始执行,但是实现难度很大
        • 几乎所有系统都采用的是Mesa语义
    • 较好但仍有问题的方案:while替代if

      • 通过把if语句改为while语句,线程会在被唤醒时再检查一次count的值,这样可以有效避免上面出现的问题
        /*
            producer()函数 生产者线程的行为
            参数:   loops int类型,生产者需要生产的产品数量
            返回值: 无
        */
        void* producer(int loops) {
            for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                while (count == 1) { // 如果缓冲区有产品,生产者就需要等待
                    Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁
                }
                // 等待条件变量完毕,生产者被唤醒,可以放入产品了,此时mutex锁已经被重新获得
                put(i); // 生产者放入产品
                pthread_cond_signal(&cond); // 生产者发出信号
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
        /*
            consumer()函数 消费者线程的行为
            参数:   loops int类型,消费者需要消费的产品数量
            返回值: 无
        */
        void* consumer(int loops) {
            for (int i = 0; i < loops; ++i) {   // 循环取出指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                while (count == 0) { // 如果缓冲区没有产品,消费者就需要等待
                    Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁
                }
                // 等待条件变量完毕,消费者被唤醒,可以取出产品了,此时mutex锁已经被重新获得
                int value = get(); // 消费者取出产品
                std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品
                pthread_cond_signal(&cond); // 消费者发出信号
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
      • 这段代码在出现多个消费者时仍然会出现问题:假设现在有1个生产者线程p12个消费者线程c1、c2
        1. 首先,两个消费者c1、c2依次运行,都发现缓冲区没东西,于是都睡了
        2. 生产者p1开始运行,放入数据,发送信号给c1,之后自己也睡了
        3. 此时c1准备继续运行p1和c2都在同一个条件变量cond上等待
        4. c1继续运行,此时还在while循环体内,于是c1检查count,发现count和预期一样是1,于是退出while循环体
        5. 接下来c1消费缓冲区里的资源,然后在条件变量cond上发送信号,说“状态改变了“,然后自己睡了
        6. 但是此时,条件变量有可能唤醒p1,也有可能唤醒c2,因为它们都在条件变量cond上等待
        7. 假设此时c2被唤醒了,然后c2发现缓冲区里没有资源,于是继续睡了
        8. 于是现在3个线程p1、c1、c2都在睡条件变量也没人来改变,也没人唤醒它们,进程就卡死了
    • 最终的单值缓冲区生产者/消费者问题方案

      • 为了保证生产者只能唤醒消费者消费者只能唤醒生产者,我们使用两个条件变量来实现单值有界缓冲区问题
        cond_t empty, fill; // 定义条件变量,empty表示发送信号时缓冲区空,fill表示发送信号时缓冲区满
        mutex_t mutex; // 定义互斥锁
        
        /*
            producer()函数 生产者线程的行为
            参数:   loops int类型,生产者需要生产的产品数量
            返回值: 无
        */
        void* producer(int loops) {
            for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                while (count == 1) { // 如果缓冲区有产品,生产者就需要等待
                    Pthread_cond_wait(&empty, &mutex); // 等待条件变量empty,等待时会自动释放mutex锁
                }
                // 生产者可以放入产品了,此时mutex锁已经被重新获得
                put(i); // 生产者放入产品
                pthread_cond_signal(&fill); // 生产者发出信号,通知消费者有产品了
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
        /*
            consumer()函数 消费者线程的行为
            参数:   loops int类型,消费者需要消费的产品数量
            返回值: 无
        */
        void* consumer(int loops) {
            for (int i = 0; i < loops; ++i) {   // 循环取出指定数量的产品
                Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源
                while (count == 0) { // 如果缓冲区没有产品,消费者就需要等待
                    Pthread_cond_wait(&fill, &mutex); // 等待条件变量fill,等待时会自动释放mutex锁
                }
                // 消费者可以取出产品了,此时mutex锁已经被重新获得
                int value = get(); // 消费者取出产品
                std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品
                pthread_cond_signal(&empty); // 消费者发出信号,通知生产者可以放入产品了
                Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源
            }
        }
        
      • 生产者线程等待条件变量empty,发送信号给条件变量fill;消费者线程等待条件变量fill,发送信号给条件变量empty。由此,消费者只会唤醒生产者,生产者也只会唤醒消费者
    • 最终的生产者/消费者方案(多值缓冲区)

      #include <iostream>
      #include <pthread.h>
      
      #define MAX 10  // 设置缓冲区大小MAX
      
      int buffer[MAX];    // buffer int数组,共享缓冲区
      int fill_ptr = 0;  // fill_ptr表示缓冲区中下一个要填充的位置
      int use_ptr = 0;   // use_ptr表示缓冲区中下一个要使用的元素的位置
      int count = 0;     // count表示缓冲区中当前元素的数量
      
      void put(int value) {
          buffer[fill_ptr] = value;  // 将value放入缓冲区
          fill_ptr = (fill_ptr + 1) % MAX;  // 更新fill_ptr,循环使用缓冲区
          count++;  // 增加缓冲区中的元素数量
      }
      
      int get() {
          int tmp = buffer[use_ptr];  // 从缓冲区获取元素
          use_ptr = (use_ptr + 1) % MAX;  // 更新use_ptr,循环使用缓冲区
          count--;  // 减少缓冲区中的元素数量
          return tmp;  // 返回获取的元素
      }
      
      cond_t empty, fill; // 定义条件变量empty和fill
      mutex_t mutex;  // 定义互斥锁mutex
      
      void* producer(void* arg) {
          for (int i = 0; i < *(int*)arg; i++) {  // 生产20个产品
              pthread_mutex_lock(&mutex);  // 加锁
              while (count == MAX) {  // 如果缓冲区满了,等待
                  pthread_cond_wait(&empty, &mutex);
              }
              put(i);  // 生产一个产品
              pthread_cond_signal(&fill);  // 通知消费者有新产品了
              pthread_mutex_unlock(&mutex);  // 解锁
          }
          return nullptr;
      }
      
      void* consumer(void* arg) {
          for (int i = 0; i < *(int*)arg; i++) {  // 消费20个产品
              pthread_mutex_lock(&mutex);  // 加锁
              while (count == 0) {  // 如果缓冲区空了,等待
                  pthread_cond_wait(&fill, &mutex);
              }
              int tmp = get();  // 消费一个产品
              std::cout << "Consumed: " << tmp << std::endl;  // 输出消费的产品
              pthread_cond_signal(&empty);  // 通知生产者有空间了
              pthread_mutex_unlock(&mutex);  // 解锁
          }
          return nullptr;
      }