mq_timedsend系统调用及示例

mq_timedsend函数详解

1. 函数介绍

mq_timedsend函数是Linux系统中用于在指定时间内发送消息到POSIX消息队列的函数。它是mq_send函数的增强版本,支持超时控制。可以把mq_timedsend想象成一个”限时消息发送器”,它能够在指定的时间内尝试发送消息,如果超时则返回错误。

这个函数特别适用于需要控制发送等待时间的场景,比如实时系统或需要避免无限期阻塞的应用程序。

使用场景:

  • 实时系统的消息发送
  • 避免无限期阻塞的发送操作
  • 超时控制的网络应用
  • 高可用性系统中的消息处理

2. 函数原型

#include <mqueue.h>
#include <time.h>

int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, 
                 unsigned int msg_prio, const struct timespec *abs_timeout);

3. 功能

mq_timedsend函数的主要功能是在指定的绝对超时时间内发送消息到消息队列。如果队列已满且在超时时间内无法发送,则返回错误。

4. 参数

  • mqdes: 消息队列描述符
    • 类型:mqd_t
    • 含义:已打开的消息队列描述符
  • msg_ptr: 消息内容指针
    • 类型:const char*
    • 含义:指向要发送的消息内容
  • msg_len: 消息长度
    • 类型:size_t
    • 含义:消息内容的字节数
  • msg_prio: 消息优先级
    • 类型:unsigned int
    • 含义:消息的优先级(0-32767)
  • abs_timeout: 绝对超时时间
    • 类型:const struct timespec*
    • 含义:绝对超时时间(基于CLOCK_REALTIME)

5. 返回值

  • 成功: 返回0
  • 失败: 返回-1,并设置errno错误码
    • EAGAIN:超时时间内无法发送消息
    • EBADF:无效的消息队列描述符
    • EINTR:被信号中断
    • EINVAL:参数无效
    • EMSGSIZE:消息大小超过队列限制
    • ETIMEDOUT:超时

6. 相似函数或关联函数

  • mq_send(): 发送消息(阻塞)
  • mq_receive(): 接收消息
  • mq_timedreceive(): 限时接收消息
  • clock_gettime(): 获取当前时间
  • pthread_cond_timedwait(): 限时条件等待

7. 示例代码

示例1:基础mq_timedsend使用 – 超时控制发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>

// 创建带限制的消息队列
mqd_t create_limited_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 2,      // 很小的队列容量
        .mq_msgsize = 128,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建消息队列失败");
        return -1;
    }
    
    printf("创建限制队列: %s (容量: %ld)\n", name, attr.mq_maxmsg);
    return mq;
}

// 计算绝对超时时间
int calculate_absolute_timeout(struct timespec* abs_timeout, int seconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    abs_timeout->tv_sec += seconds;
    return 0;
}

int main() {
    printf("=== 基础mq_timedsend使用示例 ===\n");
    
    const char* queue_name = "/timed_queue";
    
    // 创建限制队列
    mqd_t mq = create_limited_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 填满队列
    printf("1. 填满队列:\n");
    for (int i = 0; i < 2; i++) {
        char message[64];
        snprintf(message, sizeof(message), "填充消息 %d", i + 1);
        
        if (mq_send(mq, message, strlen(message), 0) == -1) {
            perror("发送填充消息失败");
        } else {
            printf("发送: %s\n", message);
        }
    }
    
    // 显示队列状态
    struct mq_attr attr;
    if (mq_getattr(mq, &attr) == 0) {
        printf("队列当前消息数: %ld/%ld\n", attr.mq_curmsgs, attr.mq_maxmsg);
    }
    
    // 演示mq_timedsend超时
    printf("\n2. 演示mq_timedsend超时:\n");
    
    struct timespec abs_timeout;
    if (calculate_absolute_timeout(&abs_timeout, 3) == -1) {  // 3秒超时
        mq_close(mq);
        mq_unlink(queue_name);
        exit(EXIT_FAILURE);
    }
    
    char test_message[] = "超时测试消息";
    printf("尝试发送消息(队列已满,3秒超时):\n");
    
    clock_t start_time = clock();
    int result = mq_timedsend(mq, test_message, strlen(test_message), 0, &abs_timeout);
    clock_t end_time = clock();
    
    double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
    
    if (result == 0) {
        printf("✓ 消息发送成功\n");
    } else {
        if (errno == ETIMEDOUT) {
            printf("✗ 发送超时 (耗时: %.2f 秒)\n", elapsed_time);
        } else if (errno == EAGAIN) {
            printf("✗ 队列满,无法发送: %s\n", strerror(errno));
        } else {
            printf("✗ 发送失败: %s\n", strerror(errno));
        }
    }
    
    // 演示成功的mq_timedsend
    printf("\n3. 演示成功的mq_timedsend:\n");
    
    // 先接收一条消息,为发送腾出空间
    char buffer[128];
    ssize_t bytes_received = mq_receive(mq, buffer, sizeof(buffer), NULL);
    if (bytes_received > 0) {
        buffer[bytes_received] = '\0';
        printf("接收消息为发送腾出空间: %s\n", buffer);
    }
    
    // 现在队列有空间了
    if (calculate_absolute_timeout(&abs_timeout, 5) == 0) {  // 5秒超时
        char success_message[] = "成功发送的消息";
        printf("发送消息(队列有空间):\n");
        
        if (mq_timedsend(mq, success_message, strlen(success_message), 5, &abs_timeout) == 0) {
            printf("✓ 消息发送成功 (优先级: 5)\n");
        } else {
            printf("✗ 发送失败: %s\n", strerror(errno));
        }
    }
    
    // 演示不同超时时间的效果
    printf("\n4. 不同超时时间演示:\n");
    
    // 立即超时(过去的时间)
    struct timespec past_time = {0, 0};
    char immediate_message[] = "立即超时消息";
    printf("使用过去时间作为超时(立即返回):\n");
    if (mq_timedsend(mq, immediate_message, strlen(immediate_message), 0, &past_time) == -1) {
        if (errno == ETIMEDOUT) {
            printf("✓ 立即超时 (预期行为)\n");
        } else {
            printf("✗ 其他错误: %s\n", strerror(errno));
        }
    }
    
    // 长时间超时
    if (calculate_absolute_timeout(&abs_timeout, 10) == 0) {  // 10秒超时
        char long_timeout_message[] = "长超时消息";
        printf("使用长超时时间:\n");
        if (mq_timedsend(mq, long_timeout_message, strlen(long_timeout_message), 1, &abs_timeout) == 0) {
            printf("✓ 长超时发送成功\n");
        } else {
            printf("✗ 长超时发送失败: %s\n", strerror(errno));
        }
    }
    
    // 清理资源
    printf("\n5. 清理资源:\n");
    mq_close(mq);
    mq_unlink(queue_name);
    printf("队列已清理\n");
    
    printf("\n=== 基础mq_timedsend演示完成 ===\n");
    
    return 0;
}

示例2:实时系统中的超时消息发送

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

#define MAX_MESSAGES 10
#define MESSAGE_SIZE 256

volatile sig_atomic_t stop_flag = 0;

// 信号处理函数
void signal_handler(int sig) {
    printf("收到信号 %d,准备停止...\n", sig);
    stop_flag = 1;
}

// 创建实时消息队列
mqd_t create_realtime_queue(const char* name) {
    struct mq_attr attr = {
        .mq_flags = 0,
        .mq_maxmsg = 5,
        .mq_msgsize = MESSAGE_SIZE,
        .mq_curmsgs = 0
    };
    
    mqd_t mq = mq_open(name, O_CREAT | O_RDWR, 0644, &attr);
    if (mq == (mqd_t)-1) {
        perror("创建实时队列失败");
        return -1;
    }
    
    printf("创建实时队列: %s\n", name);
    return mq;
}

// 计算相对超时时间
int calculate_relative_timeout(struct timespec* abs_timeout, int milliseconds) {
    if (clock_gettime(CLOCK_REALTIME, abs_timeout) == -1) {
        perror("获取当前时间失败");
        return -1;
    }
    
    // 转换毫秒到秒和纳秒
    long seconds = milliseconds / 1000;
    long nanoseconds = (milliseconds % 1000) * 1000000;
    
    abs_timeout->tv_sec += seconds;
    abs_timeout->tv_nsec += nanoseconds;
    
    // 处理纳秒溢出
    if (abs_timeout->tv_nsec >= 1000000000) {
        abs_timeout->tv_sec++;
        abs_timeout->tv_nsec -= 1000000000;
    }
    
    return 0;
}

// 实时消息发送器
void realtime_message_sender(mqd_t mq, const char* sender_name) {
    printf("实时发送器 %s 启动\n", sender_name);
    
    srand(time(NULL));
    
    int message_count = 0;
    while (!stop_flag && message_count < MAX_MESSAGES) {
        char message[MESSAGE_SIZE];
        snprintf(message, sizeof(message), "%s: 实时消息 %d", sender_name, message_count + 1);
        
        // 随机超时时间(10-100毫秒)
        int timeout_ms = 10 + rand() % 91;
        
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, timeout_ms) == -1) {
            continue;
        }
        
        // 使用mq_timedsend发送消息
        unsigned int priority = rand() % 10;
        int result = mq_timedsend(mq, message, strlen(message), priority, &abs_timeout);
        
        if (result == 0) {
            printf("[%s] 发送成功: %s (优先级: %u, 超时: %dms)\n", 
                   sender_name, message, priority, timeout_ms);
        } else {
            if (errno == ETIMEDOUT) {
                printf("[%s] 发送超时: %s (超时: %dms)\n", 
                       sender_name, message, timeout_ms);
            } else if (errno == EAGAIN) {
                printf("[%s] 队列满,发送失败: %s\n", sender_name, message);
            } else {
                printf("[%s] 发送错误: %s (%s)\n", 
                       sender_name, message, strerror(errno));
            }
        }
        
        message_count++;
        usleep(500000);  // 0.5秒间隔
    }
    
    printf("实时发送器 %s 完成\n", sender_name);
}

// 消息接收器
void message_receiver(mqd_t mq, const char* receiver_name) {
    printf("消息接收器 %s 启动\n", receiver_name);
    
    char buffer[MESSAGE_SIZE];
    ssize_t bytes_received;
    unsigned int priority;
    int received_count = 0;
    
    while (!stop_flag && received_count < MAX_MESSAGES * 2) {
        struct timespec abs_timeout;
        if (calculate_relative_timeout(&abs_timeout, 2000) == -1) {  // 2秒超时
            continue;
        }
        
        bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &abs_timeout);
        
        if (bytes_received > 0) {
            buffer[bytes_received] = '\0';
            printf("[%s] 接收: %s (优先级: %u)\n", receiver_name, buffer, priority);
            received_count++;
        } else if (errno == ETIMEDOUT) {
            printf("[%s] 接收超时\n", receiver_name);
        } else if (errno == EAGAIN) {
            printf("[%s] 暂无消息\n", receiver_name);
            usleep(100000);  // 0.1秒后重试
        } else {
            printf("[%s] 接收错误: %s\n", receiver_name, strerror(errno));
            break;
        }
    }
    
    printf("消息接收器 %s 完成,接收 %d 条消息\n", receiver_name, received_count);
}

int main() {
    printf("=== 实时系统超时消息发送示例 ===\n");
    
    const char* queue_name = "/realtime_queue";
    
    // 设置信号处理
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);
    
    // 创建实时队列
    mqd_t mq = create_realtime_queue(queue_name);
    if (mq == -1) {
        exit(EXIT_FAILURE);
    }
    
    // 启动发送器和接收器
    pid_t sender1 = fork();
    if (sender1 == 0) {
        realtime_message_sender(mq, "发送器1");
        exit(EXIT_SUCCESS);
    }
    
    pid_t sender2 = fork();
    if (sender2 == 0) {
        realtime_message_sender(mq, "发送器2");
        exit(EXIT_SUCCESS);
    }
    
    pid_t receiver = fork();
    if (receiver == 0) {
        message_receiver(mq, "接收器");
        exit(EXIT_SUCCESS);
    }
    
    // 主进程等待一段时间后发送停止信号
    printf("系统运行中... 按Ctrl+C停止或等待30秒\n");
    
    int elapsed = 0;
    while (elapsed < 30 && !stop_flag) {
        sleep(1);
        elapsed++;
        
        // 定期显示队列状态
        if (elapsed % 5 == 0) {
            struct mq_attr attr;
            if (mq_getattr(mq, &attr) == 0) {
                printf("队列状态: %ld/%ld 消息\n", attr.mq_curmsgs, attr.mq_maxmsg);
            }
        }
    }
    
    // 发送停止信号
    stop_flag = 1;
    printf("发送停止信号...\n");
    
    // 等待所有子进程完成
    waitpid(sender1, NULL, 0);
    waitpid(sender2, NULL, 0);
    waitpid(receiver, NULL, 0);
    
    // 清理资源
    mq_close(mq);
    mq_unlink(queue_name);
    printf("系统已停止,资源已清理\n");
    
    printf("\n=== 实时系统演示完成 ===\n");
    
    return 0;
}
此条目发表在linux文章分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注