目录
1. 概述
定义
生产者消费者问题是线程同步的经典问题,也称为有界缓冲区问题,问题描述大致如下:
- 生产者和消费者之间共享一个有界数据缓冲区
- 一个或多个生产者(线程或进程)向缓冲区放置数据
- 一个或多个消费者(线程或进程)从缓冲区取出数据
缓冲区
生产者消费者问题中的缓冲区,包括队列缓冲区和环形缓冲区,它们都按照先进先出的顺序处理数据,我们现在只考虑队列缓冲区:
- 队列缓冲区通常使用普通的队列数据结构
- 队列内部实现可以是链表或数组
缓冲区有两个极端状态:缓冲区空,缓冲区满。链表队列和数组队列缓冲区空的含义相同,都是队列中没有一个元素的情形,但两者缓冲区满的含义不同:
- 数组队列在初始化时就必须指定最大容量,缓冲区满的条件很清晰
- 链表队列没有最大容量的概念,需要人为指定
此外,Posix消息队列也可以作为队列缓冲区,Posix当以无优先级消息的方式使用时,也是按照先进先出的顺序进行处理的。
本文只讨论第一种数据结构队列缓冲区,基于Posix消息队列缓冲区的生产者消费者问题,会在后续Posix消息队列中单独讲解。
2. 典型模型
生产者消费者个数的多少、缓冲区的类型都会影响生产者消费者问题模型的复杂度,本文选取两种常见典型模型进行分析。
模型一
- 单个生产者 + 单个消费者
- 生产者线程启动后,立即创建消费者线程
- 缓冲区容量有限,且小于数据条目数量
该模型只需要处理生产者和消费者之间的同步问题,在实际工程很常见,具体的同步详情为:
- 当缓冲区为空时,消费者不能从其中取出数据
- 当缓冲区为满时,生产者不能向其中写入数据
模型二
- 多个生产者 + 单个消费者
- 生产者线程启动后,立即创建消费者线程
- 缓冲区容量有限,且小于数据条目数量
模型二与模型一相比,既需要处理生产者之间的同步问题,又需要处理生产者和消费者之间的同步问题,在实际工程也比较常见,具体的同步详情为:
- 同时只能有一个生产者向缓冲区写入数据
- 当缓冲区为空时,消费者不能从其中取出数据
- 当缓冲区为满时,生产者不能向其中写入数据
可选需求
模型一和模型二所列均为必须处理的同步问题,还有一个根据实际情况、可能会存在的同步需求:
- 共享数据中包含描述缓冲区当前状态的变量,如下标、计数、链表等
- 生产者和消费者在读写缓冲区后都需要更新缓冲区状态变量
- 若满足上述两个条件,则同时只能有一个生产者或消费者可以进行缓冲区操作与状态变量更新
队列缓冲区,不管是数组实现还是链表实现,其内部都符合上述条件,都需要处理该可选同步需求。
3. 数据结构队列C语言实现
网上找了份数据结构队列C语言实现的代码,稍微改了下,可正常使用,本节后续的生产者消费者问题示例代码就是用它作为缓冲区。
#ifndef _LINK_QUEUE_H_
#define _LINK_QUEUE_H_
typedef enum
{
false = 0,
true
} bool;
typedef int data_t;
typedef struct LinkNode
{
data_t data;
struct LinkNode *next;
} LinkNode, *LinkQueue;
typedef struct
{
LinkQueue front;
LinkQueue rear;
} HeadQueue;
HeadQueue *CreateEmptyQueue();
bool EmptyLinkQueue(HeadQueue *queue);
void EnQueue(HeadQueue *queue, data_t value);
void DeQueue(HeadQueue *queue, data_t *value);
void PrintQueue(HeadQueue *queue);
bool ClearLinkQueue(HeadQueue *queue);
bool DestroyLinkQueue(HeadQueue *queue);
int GetCurItemsNum(HeadQueue *queue);
#endif
#include "linkqueue.h"
#include <stdio.h>
#include <stdlib.h>
static int nitems;
//创建空链表队列
HeadQueue *CreateEmptyQueue(void)
{
HeadQueue *queue = (HeadQueue *)malloc(sizeof(HeadQueue));
if (queue == NULL)
{
perror("Create empty queue failed");
exit(EXIT_FAILURE);
}
queue->rear = queue->front = NULL;
nitems = 0;
return queue;
}
//判断是否为空链表队列
bool EmptyLinkQueue(HeadQueue *queue)
{
if (queue == NULL)
{
printf("Empty link queue error!\n");
exit(EXIT_FAILURE);
}
return queue->front == NULL ? true : false;
}
//增加队列元素
void EnQueue(HeadQueue *queue, data_t value)
{
LinkQueue new;
if (queue == NULL)
{
printf("EnQueue Error!\n");
return;
}
new = (LinkQueue)malloc(sizeof(LinkNode));
if (new == NULL)
{
perror("Insert value failed");
return;
}
new->data = value;
new->next = NULL;
if (EmptyLinkQueue(queue))
{
queue->front = queue->rear = new;
}
else
{
queue->rear->next = new;
queue->rear = new;
}
nitems++;
}
//删除队列元素
void DeQueue(HeadQueue *queue, data_t *value)
{
*value = 0;
LinkQueue remove;
if (queue == NULL)
{
printf("DeQueue error!\n");
return;
}
if (EmptyLinkQueue(queue))
{
printf("queue is empty!\n");
return;
}
remove = queue->fron