我找到了解决办法
here
. 对我来说,要理解问题的一个棘手之处是:
-
生产者和消费者必须能够双向沟通。任何一种方法都是不够的。
-
这种双向通信可以打包成一个pthread条件。
举例来说,上面提到的博客文章证明了这实际上是有意义和可取的行为:
pthread_mutex_lock(&cond_mutex);
pthread_cond_broadcast(&cond):
pthread_cond_wait(&cond, &cond_mutex);
pthread_mutex_unlock(&cond_mutex);
其理念是,如果生产商和消费者都采用这种逻辑,他们中的任何一个都可以先睡觉,因为每个人都可以唤醒另一个角色。换一种说法,在一个典型的生产者-消费者场景中——如果消费者需要睡觉,那是因为生产者需要醒来,反之亦然。在单个pthread条件下封装此逻辑是有意义的。
当然,上面的代码有一种意想不到的行为,即当工作线程实际上只想唤醒生产者时,它还会唤醒另一个正在休眠的工作线程。这可以通过简单的变量检查来解决,如@borealid建议的那样:
while(!work_available) pthread_cond_wait(&cond, &cond_mutex);
在工人广播时,所有工人线程都将被唤醒,但一个接一个(因为隐式互斥锁在
pthread_cond_wait
)因为其中一个工作线程将消耗工作(设置
work_available
回到
false
,当其他工作线程唤醒并实际开始工作时,工作将不可用,因此工作线程将再次睡眠。
以下是我测试的一些注释代码,适用于任何感兴趣的人:
// gcc -Wall -pthread threads.c -lpthread
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
pthread_cond_t my_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t my_cond_m = PTHREAD_MUTEX_INITIALIZER;
int * next_work = NULL;
int all_work_done = 0;
void * worker(void * arg)
{
int * my_work = NULL;
while(!all_work_done)
{
pthread_mutex_lock(&my_cond_m);
if(next_work == NULL)
{
// Signal producer to give work
pthread_cond_broadcast(&my_cond);
// Wait for work to arrive
// It is wrapped in a while loop because the condition
// might be triggered by another worker thread intended
// to wake up the producer
while(!next_work && !all_work_done)
pthread_cond_wait(&my_cond, &my_cond_m);
}
// Work has arrived, cache it locally so producer can
// put in next work ASAP
my_work = next_work;
next_work = NULL;
pthread_mutex_unlock(&my_cond_m);
if(my_work)
{
printf("Worker %d consuming work: %d\n", (int)(pthread_self() % 100), *my_work);
free(my_work);
}
}
return NULL;
}
int * create_work()
{
int * ret = (int *)malloc(sizeof(int));
assert(ret);
*ret = rand() % 100;
return ret;
}
void * producer(void * arg)
{
int i;
for(i = 0; i < 10; i++)
{
pthread_mutex_lock(&my_cond_m);
while(next_work != NULL)
{
// There's still work, signal a worker to pick it up
pthread_cond_broadcast(&my_cond);
// Wait for work to be picked up
pthread_cond_wait(&my_cond, &my_cond_m);
}
// No work is available now, let's put work on the queue
next_work = create_work();
printf("Producer: Created work %d\n", *next_work);
pthread_mutex_unlock(&my_cond_m);
}
// Some workers might still be waiting, release them
pthread_cond_broadcast(&my_cond);
all_work_done = 1;
return NULL;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, worker, NULL);
pthread_create(&t2, NULL, worker, NULL);
pthread_create(&t3, NULL, worker, NULL);
pthread_create(&t4, NULL, worker, NULL);
producer(NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}