mq_timedsend函数详解
1. 函数介绍
mq_timedsend函数是Linux系统中用于在指定时间内发送消息到POSIX消息队列的函数。它是mq_send函数的增强版本,支持超时控制。可以把mq_timedsend想象成一个”限时消息发送器”,它能够在指定的时间内尝试发送消息,如果超时则返回错误。
这个函数特别适用于需要控制发送等待时间的场景,比如实时系统或需要避免无限期阻塞的应用程序。
使用场景:
- 实时系统的消息发送
- 避免无限期阻塞的发送操作
- 超时控制的网络应用
- 高可用性系统中的消息处理
2. 函数原型
#include <mqueue.h>
#include <time.h>
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abs_timeout);
3. 功能
mq_timedsend函数的主要功能是在指定的绝对超时时间内发送消息到消息队列。如果队列已满且在超时时间内无法发送,则返回错误。
4. 参数
- mqdes: 消息队列描述符
- 类型:mqd_t
- 含义:已打开的消息队列描述符
- msg_ptr: 消息内容指针
- 类型:const char*
- 含义:指向要发送的消息内容
- msg_len: 消息长度
- 类型:size_t
- 含义:消息内容的字节数
- msg_prio: 消息优先级
- 类型:unsigned int
- 含义:消息的优先级(0-32767)
- abs_timeout: 绝对超时时间
- 类型:const struct timespec*
- 含义:绝对超时时间(基于CLOCK_REALTIME)
5. 返回值
- 成功: 返回0
- 失败: 返回-1,并设置errno错误码
- EAGAIN:超时时间内无法发送消息
- EBADF:无效的消息队列描述符
- EINTR:被信号中断
- EINVAL:参数无效
- EMSGSIZE:消息大小超过队列限制
- ETIMEDOUT:超时
6. 相似函数或关联函数
- mq_send(): 发送消息(阻塞)
- mq_receive(): 接收消息
- mq_timedreceive(): 限时接收消息
- clock_gettime(): 获取当前时间
- pthread_cond_timedwait(): 限时条件等待
7. 示例代码
示例1:基础mq_timedsend使用 – 超时控制发送
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
// 创建带限制的消息队列
mqd_t create_limited_queue(const char* name) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 2, // 很小的队列容量
.mq_msgsize = 128,
.mq_curmsgs = 0
};
mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建消息队列失败");
return -1;
}
printf("创建限制队列: %s (容量: %ld)\n", name, attr.mq_maxmsg);
return mq;
}
// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}
abs_timeout->tv_sec += seconds;
return 0;
}
int main() {
printf("=== 基础mq_timedsend使用示例 ===\n");
const char* queue_name = "/timed_queue";
// 创建限制队列
mqd_t mq = create_limited_queue(queue_name);
if (mq == -1) {
exit(EXIT_FAILURE);
}
// 填满队列
printf("1. 填满队列:\n");
for (int i = 0; i < 2; i++) {
char message[64];
snprintf(message, sizeof(message), "填充消息 %d", i + 1);
if (mq_send(mq, message, strlen(message), 0) == -1) {
perror("发送填充消息失败");
} else {
printf("发送: %s\n", message);
}
}
// 显示队列状态
struct mq_attr attr;
if (mq_getattr(mq, &attr) == 0) {
printf("队列当前消息数: %ld/%ld\n", attr.mq_curmsgs, attr.mq_maxmsg);
}
// 演示mq_timedsend超时
printf("\n2. 演示mq_timedsend超时:\n");
struct timespec abs_timeout;
if (calculate_absolute_timeout(&abs_timeout, 3) == -1) { // 3秒超时
mq_close(mq);
mq_unlink(queue_name);
exit(EXIT_FAILURE);
}
char test_message[] = "超时测试消息";
printf("尝试发送消息(队列已满,3秒超时):\n");
clock_t start_time = clock();
int result = mq_timedsend(mq, test_message, strlen(test_message), 0, &abs_timeout);
clock_t end_time = clock();
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
if (result == 0) {
printf("✓ 消息发送成功\n");
} else {
if (errno == ETIMEDOUT) {
printf("✗ 发送超时 (耗时: %.2f 秒)\n", elapsed_time);
} else if (errno == EAGAIN) {
printf("✗ 队列满,无法发送: %s\n", strerror(errno));
} else {
printf("✗ 发送失败: %s\n", strerror(errno));
}
}
// 演示成功的mq_timedsend
printf("\n3. 演示成功的mq_timedsend:\n");
// 先接收一条消息,为发送腾出空间
char buffer[128];
ssize_t bytes_received = mq_receive(mq, buffer, sizeof(buffer), NULL);
if (bytes_received > 0) {
buffer[bytes_received] = '\0';
printf("接收消息为发送腾出空间: %s\n", buffer);
}
// 现在队列有空间了
if (calculate_absolute_timeout(&abs_timeout, 5) == 0) { // 5秒超时
char success_message[] = "成功发送的消息";
printf("发送消息(队列有空间):\n");
if (mq_timedsend(mq, success_message, strlen(success_message), 5, &abs_timeout) == 0) {
printf("✓ 消息发送成功 (优先级: 5)\n");
} else {
printf("✗ 发送失败: %s\n", strerror(errno));
}
}
// 演示不同超时时间的效果
printf("\n4. 不同超时时间演示:\n");
// 立即超时(过去的时间)
struct timespec past_time = {0, 0};
char immediate_message[] = "立即超时消息";
printf("使用过去时间作为超时(立即返回):\n");
if (mq_timedsend(mq, immediate_message, strlen(immediate_message), 0, &past_time) == -1) {
if (errno == ETIMEDOUT) {
printf("✓ 立即超时 (预期行为)\n");
} else {
printf("✗ 其他错误: %s\n", strerror(errno));
}
}
// 长时间超时
if (calculate_absolute_timeout(&abs_timeout, 10) == 0) { // 10秒超时
char long_timeout_message[] = "长超时消息";
printf("使用长超时时间:\n");
if (mq_timedsend(mq, long_timeout_message, strlen(long_timeout_message), 1, &abs_timeout) == 0) {
printf("✓ 长超时发送成功\n");
} else {
printf("✗ 长超时发送失败: %s\n", strerror(errno));
}
}
// 清理资源
printf("\n5. 清理资源:\n");
mq_close(mq);
mq_unlink(queue_name);
printf("队列已清理\n");
printf("\n=== 基础mq_timedsend演示完成 ===\n");
return 0;
}
示例2:实时系统中的超时消息发送
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>
#define MAX_MESSAGES 10
#define MESSAGE_SIZE 256
volatile sig_atomic_t stop_flag = 0;
// 信号处理函数
void signal_handler(int sig) {
printf("收到信号 %d,准备停止...\n", sig);
stop_flag = 1;
}
// 创建实时消息队列
mqd_t create_realtime_queue(const char* name) {
struct mq_attr attr = {
.mq_flags = 0,
.mq_maxmsg = 5,
.mq_msgsize = MESSAGE_SIZE,
.mq_curmsgs = 0
};
mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("创建实时队列失败");
return -1;
}
printf("创建实时队列: %s\n", name);
return mq;
}
// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
perror("获取当前时间失败");
return -1;
}
// 转换毫秒到秒和纳秒
long seconds = milliseconds / 1000;
long nanoseconds = (milliseconds % 1000) * 1000000;
abs_timeout->tv_sec += seconds;
abs_timeout->tv_nsec += nanoseconds;
// 处理纳秒溢出
if (abs_timeout->tv_nsec >= 1000000000) {
abs_timeout->tv_sec++;
abs_timeout->tv_nsec -= 1000000000;
}
return 0;
}
// 实时消息发送器
void realtime_message_sender(mqd_t mq, const char* sender_name) {
printf("实时发送器 %s 启动\n", sender_name);
srand(time(NULL));
int message_count = 0;
while (!stop_flag && message_count < MAX_MESSAGES) {
char message[MESSAGE_SIZE];
snprintf(message, sizeof(message), "%s: 实时消息 %d", sender_name, message_count + 1);
// 随机超时时间(10-100毫秒)
int timeout_ms = 10 + rand() % 91;
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, timeout_ms) == -1) {
continue;
}
// 使用mq_timedsend发送消息
unsigned int priority = rand() % 10;
int result = mq_timedsend(mq, message, strlen(message), priority, &abs_timeout);
if (result == 0) {
printf("[%s] 发送成功: %s (优先级: %u, 超时: %dms)\n",
sender_name, message, priority, timeout_ms);
} else {
if (errno == ETIMEDOUT) {
printf("[%s] 发送超时: %s (超时: %dms)\n",
sender_name, message, timeout_ms);
} else if (errno == EAGAIN) {
printf("[%s] 队列满,发送失败: %s\n", sender_name, message);
} else {
printf("[%s] 发送错误: %s (%s)\n",
sender_name, message, strerror(errno));
}
}
message_count++;
usleep(500000); // 0.5秒间隔
}
printf("实时发送器 %s 完成\n", sender_name);
}
// 消息接收器
void message_receiver(mqd_t mq, const char* receiver_name) {
printf("消息接收器 %s 启动\n", receiver_name);
char buffer[MESSAGE_SIZE];
ssize_t bytes_received;
unsigned int priority;
int received_count = 0;
while (!stop_flag && received_count < MAX_MESSAGES * 2) {
struct timespec abs_timeout;
if (calculate_relative_timeout(&abs_timeout, 2000) == -1) { // 2秒超时
continue;
}
bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
if (bytes_received > 0) {
buffer[bytes_received] = '\0';
printf("[%s] 接收: %s (优先级: %u)\n", receiver_name, buffer, priority);
received_count++;
} else if (errno == ETIMEDOUT) {
printf("[%s] 接收超时\n", receiver_name);
} else if (errno == EAGAIN) {
printf("[%s] 暂无消息\n", receiver_name);
usleep(100000); // 0.1秒后重试
} else {
printf("[%s] 接收错误: %s\n", receiver_name, strerror(errno));
break;
}
}
printf("消息接收器 %s 完成,接收 %d 条消息\n", receiver_name, received_count);
}
int main() {
printf("=== 实时系统超时消息发送示例 ===\n");
const char* queue_name = "/realtime_queue";
// 设置信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// 创建实时队列
mqd_t mq = create_realtime_queue(queue_name);
if (mq == -1) {
exit(EXIT_FAILURE);
}
// 启动发送器和接收器
pid_t sender1 = fork();
if (sender1 == 0) {
realtime_message_sender(mq, "发送器1");
exit(EXIT_SUCCESS);
}
pid_t sender2 = fork();
if (sender2 == 0) {
realtime_message_sender(mq, "发送器2");
exit(EXIT_SUCCESS);
}
pid_t receiver = fork();
if (receiver == 0) {
message_receiver(mq, "接收器");
exit(EXIT_SUCCESS);
}
// 主进程等待一段时间后发送停止信号
printf("系统运行中... 按Ctrl+C停止或等待30秒\n");
int elapsed = 0;
while (elapsed < 30 && !stop_flag) {
sleep(1);
elapsed++;
// 定期显示队列状态
if (elapsed % 5 == 0) {
struct mq_attr attr;
if (mq_getattr(mq, &attr) == 0) {
printf("队列状态: %ld/%ld 消息\n", attr.mq_curmsgs, attr.mq_maxmsg);
}
}
}
// 发送停止信号
stop_flag = 1;
printf("发送停止信号...\n");
// 等待所有子进程完成
waitpid(sender1, NULL, 0);
waitpid(sender2, NULL, 0);
waitpid(receiver, NULL, 0);
// 清理资源
mq_close(mq);
mq_unlink(queue_name);
printf("系统已停止,资源已清理\n");
printf("\n=== 实时系统演示完成 ===\n");
return 0;
}