-
线程与条件
- 很多时候,线程需要满足一定条件才可以继续运行:
- 父线程需要等待子线程执行完毕才可以返回
- 自旋等待的目的满足后唤醒线程
- 线程需要使用条件变量来等待一个条件满足
- 很多时候,线程需要满足一定条件才可以继续运行:
-
定义和程序
- 条件变量(condition variable) 是一个显式队列
- 当某些执行条件(condition) 不满足时,线程可以把自己加入队列,并等待(waiting) 该条件
- 另外一个线程改变这个条件时,就可以唤醒一个或多个等待线程
- 条件变量提供了两个接口:wait() 和signal()
-
wait()
- 当线程需要睡眠的时候,就调用wait()
- wait()需要传入一个锁作为参数,且这个锁已经被锁住了
- wait()的职责就是释放锁,同时让调用wait()的线程休眠,这是一整个原子操作
- 当有其他线程调用signal()唤醒当前线程,当前线程才会继续运行,同时会尝试重新持有该锁
-
signal()
- 当线程想唤醒等待在某个条件变量上的睡眠线程时,就调用signal()
- 若一个线程在等待某个条件变量,那么当受到signal()唤醒后,它会尝试重新持有相关的锁
-
-
生产者/消费者(有界缓冲区)问题
- 生产者/消费者(producer/consumer)问题,也称为有界缓冲区(bounded-buffer)问题是指:
- 系统中有一个或多个生产者线程,以及一个或多个消费者线程
- 生产者会将生成的数据项放入缓冲区
- 消费者会从缓冲区中取走数据项,并以某种方式消费
- 有界缓冲区是一片共享资源,所以必须通过同步机制来访问它,以免出现竞态条件
-
单值缓冲区存入接口put()与取出接口get()
- 假设缓冲区只有一个单元,缓冲区满时消费者就可以消费资源
int buffer; // 只有一个单元的缓冲区 int count = 0; // 缓冲区中的产品数量 /* put()函数 提供生产者放入产品的接口 参数: value int类型,生产者放入缓冲区的产品 返回值: 无 */ void put(int value) { assert(count == 0); // 检查缓冲区是不是已有产品,如果已有产品,程序就会终止 count = 1; // 更新产品数量 buffer = value; // 放入产品 } /* get()函数 提供消费者取出产品的接口,并返回取出的产品 参数: 无 返回值: 消费者取出的产品(int类型) */ int get() { assert(count == 1); // 检查缓冲区有没有产品,如果没有产品,程序就会终止 count = 0; // 更新产品数量 return buffer; // 取出产品 } -
最简单的生产者线程与消费者线程函数
/* producer()函数 生产者线程的行为 参数: loops int类型,生产者需要生产的产品数量 返回值: 无 */ void* producer(int loops) { for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品 put(i); // 生产者放入产品,产品的值为i } } /* consumer()函数 消费者线程的行为 参数: loops int类型,消费者需要消费的产品数量 返回值: 无 */ void* consumer(int loops) { for (int i = 0; i < loops; ++i) { // 循环取出指定数量的产品 int value = get(); // 消费者取出产品,将产品存入value变量 std::cout << "Consumed: " << value << std::endl; // 显示消费的产品 } } -
有问题的方案
- 因为put()和get()中存在临界区,所以我们需要锁来隔离两个线程,同时我们还需要条件变量来在恰当的时候唤醒线程
-
首次尝试
cond_t cond; // 定义条件变量 mutex_t mutex; // 定义互斥锁(锁变量) /* producer()函数 生产者线程的行为 参数: loops int类型,生产者需要生产的产品数量 返回值: 无 */ void* producer(int loops) { for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 if (count == 1) { // 如果缓冲区有产品,生产者就需要等待 Pthread_cond_wait(&cond, &mutex);// 等待条件变量cond,等待时会自动释放mutex锁 } // 等待条件变量完毕,生产者被唤醒,可以放入产品了,此时mutex锁已经被重新获得 put(i); // 生产者放入产品 pthread_cond_signal(&cond); // 生产者发出信号 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } /* consumer()函数 消费者线程的行为 参数: loops int类型,消费者需要消费的产品数量 返回值: 无 */ void* consumer(int loops) { for (int i = 0; i < loops; ++i) { // 循环取出指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 if (count == 0) { // 如果缓冲区没有产品,消费者就需要等待 Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁 } // 等待条件变量完毕,消费者被唤醒,可以取出产品了,此时mutex锁已经被重新获得 int value = get(); // 消费者取出产品 std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品 pthread_cond_signal(&cond); // 消费者发出信号 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } - 这段代码在只有一个生产者线程和一个消费者线程时可以正常运作,但是如果消费者变得更多,它将会出现问题: 假设现在有1个生产者线程p1,和2个消费者线程c1、c2
- 首先消费者c1运行,它会获得锁
- 然后检查缓冲区是否已满(count==1则为已满)
- c1发现缓冲区内没有资源,于是释放锁,并开始等待,直到条件变量将其唤醒
- 生产者p1运行,它会获得锁
- 然后检查缓冲区是否已满(count==1则为已满)
- p1发现count==0,说明缓冲区没有满,于是在缓冲区中放入一个资源,此时buffer被设为该资源的值,count被设为1
- 然后p1在条件变量cond上发送信号“状态改变了”,于是,现在等待在条件变量cond上的c1被唤醒,于是c1准备开始运行
- p1继续运行,直到在for循环体中再一次检查count,发现count为1,于是睡眠
- 但是,此时操作系统调度让c2先开始运行。c2一看,发现缓冲区满了,于是把资源取走了
- 然后c1开始运行,但是此时缓冲区的资源已经被消费了,而c1不知道这事,所以c1只能错误地取走一个已经被消费的、无效的值
- 在一个线程通过条件变量发送信号时,该线程只是唤醒了某一个线程,并不能保证该线程立刻开始执行,也就不能保证被唤醒的线程开始运行时一定是它期望的情况
- 信号的这种释义被称为Mesa语义
- 与之相对的有Hoare语义,它会保证线程被唤醒时立刻开始执行,但是实现难度很大
- 几乎所有系统都采用的是Mesa语义
-
较好但仍有问题的方案:while替代if
- 通过把if语句改为while语句,线程会在被唤醒时再检查一次count的值,这样可以有效避免上面出现的问题
/* producer()函数 生产者线程的行为 参数: loops int类型,生产者需要生产的产品数量 返回值: 无 */ void* producer(int loops) { for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 while (count == 1) { // 如果缓冲区有产品,生产者就需要等待 Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁 } // 等待条件变量完毕,生产者被唤醒,可以放入产品了,此时mutex锁已经被重新获得 put(i); // 生产者放入产品 pthread_cond_signal(&cond); // 生产者发出信号 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } /* consumer()函数 消费者线程的行为 参数: loops int类型,消费者需要消费的产品数量 返回值: 无 */ void* consumer(int loops) { for (int i = 0; i < loops; ++i) { // 循环取出指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 while (count == 0) { // 如果缓冲区没有产品,消费者就需要等待 Pthread_cond_wait(&cond, &mutex); // 等待条件变量cond,等待时会自动释放mutex锁 } // 等待条件变量完毕,消费者被唤醒,可以取出产品了,此时mutex锁已经被重新获得 int value = get(); // 消费者取出产品 std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品 pthread_cond_signal(&cond); // 消费者发出信号 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } - 这段代码在出现多个消费者时仍然会出现问题:假设现在有1个生产者线程p1和2个消费者线程c1、c2
- 首先,两个消费者c1、c2依次运行,都发现缓冲区没东西,于是都睡了
- 生产者p1开始运行,放入数据,发送信号给c1,之后自己也睡了
- 此时c1准备继续运行,p1和c2都在同一个条件变量cond上等待
- c1继续运行,此时还在while循环体内,于是c1检查count,发现count和预期一样是1,于是退出while循环体
- 接下来c1消费缓冲区里的资源,然后在条件变量cond上发送信号,说“状态改变了“,然后自己睡了
- 但是此时,条件变量有可能唤醒p1,也有可能唤醒c2,因为它们都在条件变量cond上等待
- 假设此时c2被唤醒了,然后c2发现缓冲区里没有资源,于是继续睡了
- 于是现在3个线程p1、c1、c2都在睡,条件变量也没人来改变,也没人唤醒它们,进程就卡死了
- 通过把if语句改为while语句,线程会在被唤醒时再检查一次count的值,这样可以有效避免上面出现的问题
-
最终的单值缓冲区生产者/消费者问题方案
- 为了保证生产者只能唤醒消费者,消费者只能唤醒生产者,我们使用两个条件变量来实现单值有界缓冲区问题
cond_t empty, fill; // 定义条件变量,empty表示发送信号时缓冲区空,fill表示发送信号时缓冲区满 mutex_t mutex; // 定义互斥锁 /* producer()函数 生产者线程的行为 参数: loops int类型,生产者需要生产的产品数量 返回值: 无 */ void* producer(int loops) { for (int i = 0; i < loops; ++i) { // 循环生产指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 while (count == 1) { // 如果缓冲区有产品,生产者就需要等待 Pthread_cond_wait(&empty, &mutex); // 等待条件变量empty,等待时会自动释放mutex锁 } // 生产者可以放入产品了,此时mutex锁已经被重新获得 put(i); // 生产者放入产品 pthread_cond_signal(&fill); // 生产者发出信号,通知消费者有产品了 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } /* consumer()函数 消费者线程的行为 参数: loops int类型,消费者需要消费的产品数量 返回值: 无 */ void* consumer(int loops) { for (int i = 0; i < loops; ++i) { // 循环取出指定数量的产品 Pthread_mutex_lock(&mutex); // 将mutex锁锁住,保护共享资源 while (count == 0) { // 如果缓冲区没有产品,消费者就需要等待 Pthread_cond_wait(&fill, &mutex); // 等待条件变量fill,等待时会自动释放mutex锁 } // 消费者可以取出产品了,此时mutex锁已经被重新获得 int value = get(); // 消费者取出产品 std::cout << "Consumer got: " << value << std::endl; // 输出消费者取出的产品 pthread_cond_signal(&empty); // 消费者发出信号,通知生产者可以放入产品了 Pthread_mutex_unlock(&mutex); // 将mutex锁解开,允许其他线程访问共享资源 } } - 生产者线程等待条件变量empty,发送信号给条件变量fill;消费者线程等待条件变量fill,发送信号给条件变量empty。由此,消费者只会唤醒生产者,生产者也只会唤醒消费者
- 为了保证生产者只能唤醒消费者,消费者只能唤醒生产者,我们使用两个条件变量来实现单值有界缓冲区问题
-
最终的生产者/消费者方案(多值缓冲区)
#include <iostream> #include <pthread.h> #define MAX 10 // 设置缓冲区大小MAX int buffer[MAX]; // buffer int数组,共享缓冲区 int fill_ptr = 0; // fill_ptr表示缓冲区中下一个要填充的位置 int use_ptr = 0; // use_ptr表示缓冲区中下一个要使用的元素的位置 int count = 0; // count表示缓冲区中当前元素的数量 void put(int value) { buffer[fill_ptr] = value; // 将value放入缓冲区 fill_ptr = (fill_ptr + 1) % MAX; // 更新fill_ptr,循环使用缓冲区 count++; // 增加缓冲区中的元素数量 } int get() { int tmp = buffer[use_ptr]; // 从缓冲区获取元素 use_ptr = (use_ptr + 1) % MAX; // 更新use_ptr,循环使用缓冲区 count--; // 减少缓冲区中的元素数量 return tmp; // 返回获取的元素 } cond_t empty, fill; // 定义条件变量empty和fill mutex_t mutex; // 定义互斥锁mutex void* producer(void* arg) { for (int i = 0; i < *(int*)arg; i++) { // 生产20个产品 pthread_mutex_lock(&mutex); // 加锁 while (count == MAX) { // 如果缓冲区满了,等待 pthread_cond_wait(&empty, &mutex); } put(i); // 生产一个产品 pthread_cond_signal(&fill); // 通知消费者有新产品了 pthread_mutex_unlock(&mutex); // 解锁 } return nullptr; } void* consumer(void* arg) { for (int i = 0; i < *(int*)arg; i++) { // 消费20个产品 pthread_mutex_lock(&mutex); // 加锁 while (count == 0) { // 如果缓冲区空了,等待 pthread_cond_wait(&fill, &mutex); } int tmp = get(); // 消费一个产品 std::cout << "Consumed: " << tmp << std::endl; // 输出消费的产品 pthread_cond_signal(&empty); // 通知生产者有空间了 pthread_mutex_unlock(&mutex); // 解锁 } return nullptr; }
- 生产者/消费者(producer/consumer)问题,也称为有界缓冲区(bounded-buffer)问题是指:
操作系统 - Operating System
/
Chapter III 并发
/
Section 1 线程
/
1-2 条件变量与信号量