1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
| #define _GNU_SOURCE #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <mqueue.h> #include <fcntl.h> #include <sys/stat.h> #include <sys/wait.h> #include <string.h> #include <errno.h> #include <time.h>
#define MAX_MESSAGES 5 #define MESSAGE_SIZE 256
// 生产者进程 void producer_process(const char* queue_name, int producer_id) { printf("生产者 %d 启动\n", producer_id); // 打开已存在的队列 mqd_t mq = mq_open(queue_name, O_WRONLY); if (mq == (mqd_t)-1) { perror("生产者打开队列失败"); exit(EXIT_FAILURE); } srand(time(NULL) + producer_id); // 发送消息 for (int i = 0; i < MAX_MESSAGES; i++) { char message[MESSAGE_SIZE]; snprintf(message, sizeof(message), "生产者%d的消息%d", producer_id, i + 1); // 随机优先级 unsigned int priority = rand() % 10; if (mq_send(mq, message, strlen(message), priority) == -1) { perror("发送消息失败"); } else { printf("生产者 %d 发送: %s (优先级: %u)\n", producer_id, message, priority); } sleep(1); // 模拟处理时间 } printf("生产者 %d 完成\n", producer_id); mq_close(mq); }
// 消费者进程 void consumer_process(const char* queue_name, int consumer_id) { printf("消费者 %d 启动\n", consumer_id); // 打开已存在的队列 mqd_t mq = mq_open(queue_name, O_RDONLY); if (mq == (mqd_t)-1) { perror("消费者打开队列失败"); exit(EXIT_FAILURE); } // 接收消息 char buffer[MESSAGE_SIZE]; ssize_t bytes_received; unsigned int priority; int message_count = 0; while (message_count < MAX_MESSAGES * 2) { // 期望接收所有生产者的消息 bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority); if (bytes_received > 0) { buffer[bytes_received] = '\0'; printf("消费者 %d 接收: %s (优先级: %u)\n", consumer_id, buffer, priority); message_count++; } else if (errno == EAGAIN) { // 非阻塞模式下没有消息 printf("消费者 %d: 暂无消息\n", consumer_id); sleep(1); } else { perror("接收消息失败"); break; } } printf("消费者 %d 完成,接收 %d 条消息\n", consumer_id, message_count); mq_close(mq); }
// 管理进程 void manager_process(const char* queue_name) { printf("管理进程启动\n"); // 创建消息队列 struct mq_attr attr = { .mq_flags = 0, .mq_maxmsg = 20, .mq_msgsize = MESSAGE_SIZE, .mq_curmsgs = 0 }; mqd_t mq = mq_open(queue_name, O_CREAT | O_RDWR, 0644, &attr); if (mq == (mqd_t)-1) { perror("管理进程创建队列失败"); exit(EXIT_FAILURE); } printf("管理进程创建队列: %s\n", queue_name); // 启动生产者和消费者进程 pid_t producers[2], consumers[2]; // 启动生产者 for (int i = 0; i < 2; i++) { producers[i] = fork(); if (producers[i] == 0) { producer_process(queue_name, i + 1); exit(EXIT_SUCCESS); } } // 启动消费者 for (int i = 0; i < 2; i++) { consumers[i] = fork(); if (consumers[i] == 0) { consumer_process(queue_name, i + 1); exit(EXIT_SUCCESS); } } // 等待生产者完成 printf("管理进程等待生产者完成...\n"); for (int i = 0; i < 2; i++) { waitpid(producers[i], NULL, 0); } printf("所有生产者已完成\n"); // 模拟一段时间让消费者处理完消息 sleep(3); // 删除队列名称(但队列仍存在,因为消费者还在使用) printf("管理进程删除队列名称...\n"); if (mq_unlink(queue_name) == 0) { printf("✓ 队列名称已删除,但队列仍存在(消费者仍在使用)\n"); } else { printf("✗ 删除队列名称失败: %s\n", strerror(errno)); } // 等待消费者完成 printf("管理进程等待消费者完成...\n"); for (int i = 0; i < 2; i++) { waitpid(consumers[i], NULL, 0); } printf("所有消费者已完成\n"); // 现在队列才会被真正销毁(所有描述符都已关闭) printf("队列已被真正销毁\n"); mq_close(mq); printf("管理进程完成\n"); }
int main() { printf("=== 多进程共享队列删除管理示例 ===\n"); const char* queue_name = "/shared_queue"; // 启动管理进程 pid_t manager = fork(); if (manager == 0) { manager_process(queue_name); exit(EXIT_SUCCESS); } // 父进程等待管理进程完成 waitpid(manager, NULL, 0); // 验证队列是否已被删除 printf("\n验证队列删除效果:\n"); mqd_t mq = mq_open(queue_name, O_RDONLY); if (mq == -1) { printf("✓ 队列已成功删除: %s\n", strerror(errno)); } else { printf("✗ 队列仍然存在\n"); mq_close(mq); } printf("\n=== 多进程队列管理演示完成 ===\n"); return 0; }
|