目录
1. 概述
消息队列可认为是一个消息链表,队列中的每个消息具有如下属性:
- 消息优先级,由发送者赋予
- 消息数据长度,可以为0
- 消息数据(如果消息数据长度大于0)
Posix消息队列主要用于线程间消息的传递:
- A线程向队列中放置消息,B线程从队列中取出消息
- A线程向队列写入消息之前,不需要B线程在该队列上等待消息的到达
- A线程向队列写入消息之后,B线程可以在之后的某个时刻取出消息
- A线程只关心向队列放入消息,B线程只关心从队列取出消息,A、B两个线程相互独立、互不影响
2. Posix消息队列
创建与打开
mq_open
用于创建一个新的消息队列或打开一个已存在的消息队列,编译时需指定链接-lrt,下面其他函数同理。
//成功返回消息队列描述符,失败返回-1
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
- 当创建一个新的消息队列时,attr参数用于给新队列指定某些属性,若attr为NULL,则使用默认属性
- mq_open的返回值称为消息队列描述符,它的类型取决于系统实现,可能是整型或指针
- Linux下的Posix消息队列创建在虚拟文件系统中,正常情况下是不可见的,需要挂载到
/dev/mqueue/
目录才可以查看
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue
关闭与删除
mq_close
用于关闭已打开的消息队列,mq_unlink
用于从系统中删除消息队列。
//两个函数返回值:成功返回0,失败返回-1
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
关闭与删除机制已在Posix信号量中讲过,这里不再赘述。
消息队列属性
获取属性
//成功返回0,失败返回-1
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mq_getattr
用于获取消息队列的四个属性,这四个属性定义在struct mq_attr
结构体中。
struct mq_attr
{
long mq_flags; //非阻塞标志,可设0或O_NONBLOCK,由且仅由mq_setattr设置
long mq_maxmsg; //队列中最大消息条数,由mq_open在创建新队列时设置
long mq_msgsize; //消息最大长度,由mq_open在创建新队列时设置
long mq_curmsgs; //队列中当前消息条数,只能获取不能设置
};
设置属性
//成功返回0,失败返回-1
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oldattr);
在消息队列的四个属性中:
- mq_curmsgs只能获取不能设置
- mq_flags只能通过mq_setattr设置,该函数的唯一作用就是设置或清除非阻塞标志
mq_maxmsg
和mq_msgsize
只能在创建新队列时由mq_open的attr参数设置- mq_maxmsg和mq_msgsize必须同时指定,否则mq_open创建新队列会失败
#include <mqueue.h>
#include <stdio.h>
int main()
{
struct mq_attr attr;
struct mq_attr attr1;
mqd_t mqdes;
/*
* 在我的系统上,消息队列默认属性为:mq_maxmsg = 10, mq_msgsize = 8192.
* 这里显式指定attr.mq_maxmsg = 5,mq_msgsize不赋值,会导致mq_open失败.
*/
attr.mq_maxmsg = 5;
//attr.mq_msgsize = 8192;
if ((mqdes = mq_open("/mqueue1", O_RDWR | O_CREAT, 0666, &attr)) == -1)
{
printf("mq_open create new mqueue failed because attr.mq_msgsize not specified.\n");
}
mq_getattr(mqdes, &attr1);
printf("%ld %ld\n", attr1.mq_maxmsg, attr1.mq_msgsize);
mq_close(mqdes);
mq_unlink("/mqueue1");
return 0;
}
消息发送与接收
mq_send
用于向队列中放入一个消息,mq_receive
用于从队列中取走一个消息。
//成功返回0,失败返回-1
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
//成功返回消息数据长度,失败返回-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
- prio是消息优先级,范围为[0, MQ_PRIO_MAX - 1],prio值越大,消息优先级越高
- 如果不关心消息优先级,就分别给mq_send和mq_receive的prio参数传0和传NULL
- mq_receive总是返回最高优先级的最早消息
- mq_receive的参数len指的是接收缓冲区大小,它必须大于等于该队列的mq_msgsize,否则会立即出错返回
- 可以先调用mq_getattr获得mq_msgsize的值,然后再动态分配接收缓冲区
3. 消息队列限制
消息队列共有4个属性受到系统限制:
- msg_max
- msgsize_max
- MQ_OPEN_MAX
- MQ_PRIO_MAX
其中,前两个限制和应用程序的开发密切相关,既要保证队列不会被填满,又要保证消息长度不会超过允许的最大值,必要时可以修改Linux内核源码来改变限制值。
查看限制的方法:
cat /proc/sys/fs/mqueue/msg_max //struct mq_attr.mq_maxmsg <= msg_max
cat /proc/sys/fs/mqueue/msgsize_max //struct mq_attr.mq_msgsize <= msgsize_max
cat /proc/sys/fs/mqueue/queues_max
4. 生产者消费者问题——Posix消息队列实现
不难看出,Posix消息队列的基本使用模型就是一个典型的生产者消费者问题:
- 如果使用无优先级的消息,那么消息是按照先进先出的顺序处理的
- 因此,Posix消息队列也可以作为生产者消费者问题中的队列缓冲区
单生产者 + 单消费者
我们把前面写的生产