1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <signal.h> #include <time.h> #include <pthread.h>
// 消息优先级枚举 enum message_priority { PRIORITY_LOW = 10, PRIORITY_NORMAL = 50, PRIORITY_HIGH = 100, PRIORITY_CRITICAL = 200 };
// 通用消息结构体 struct unified_message { int priority; time_t timestamp; pid_t sender_pid; char type[32]; size_t data_size; char data[512]; };
// 消息处理器函数指针 typedef void (*message_handler_t)(const struct unified_message *msg);
// 消息系统接口 struct message_system { const char *name; int (*send_func)(const struct unified_message *msg); int (*receive_func)(struct unified_message *msg, int timeout_ms); int (*init_func)(void); void (*cleanup_func)(void); };
// 实时信号消息系统 static int rt_signal_send(const struct unified_message *msg) { union sigval data; data.sival_int = msg->priority; // 使用 SIGRTMIN + 0 作为消息信号 return sigqueue(getpid(), SIGRTMIN, data); }
static int rt_signal_receive(struct unified_message *msg, int timeout_ms) { // 这里简化处理,实际需要信号处理机制 sleep(timeout_ms / 1000); return -1; // 超时 }
// 自定义队列消息系统 static struct unified_message *message_queue[100]; static int queue_head = 0, queue_tail = 0, queue_count = 0; static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static int custom_queue_send(const struct unified_message *msg) { pthread_mutex_lock(&queue_mutex); if (queue_count >= 100) { pthread_mutex_unlock(&queue_mutex); return -1; // 队列满 } // 分配消息内存 struct unified_message *new_msg = malloc(sizeof(struct unified_message)); if (!new_msg) { pthread_mutex_unlock(&queue_mutex); return -1; } memcpy(new_msg, msg, sizeof(struct unified_message)); new_msg->timestamp = time(NULL); // 按优先级插入队列 int insert_pos = queue_head; for (int i = 0; i < queue_count; i++) { int pos = (queue_head + i) % 100; if (message_queue[pos]->priority < new_msg->priority) { insert_pos = pos; break; } } // 移动后续消息 for (int i = queue_count; i > insert_pos; i--) { int from_pos = (queue_head + i - 1) % 100; int to_pos = (queue_head + i) % 100; message_queue[to_pos] = message_queue[from_pos]; } message_queue[insert_pos] = new_msg; queue_count++; pthread_mutex_unlock(&queue_mutex); return 0; }
static int custom_queue_receive(struct unified_message *msg, int timeout_ms) { time_t start_time = time(NULL); while (1) { pthread_mutex_lock(&queue_mutex); if (queue_count > 0) { // 取出最高优先级消息 struct unified_message *highest_msg = message_queue[queue_head]; memcpy(msg, highest_msg, sizeof(struct unified_message)); free(highest_msg); queue_head = (queue_head + 1) % 100; queue_count--; pthread_mutex_unlock(&queue_mutex); return 0; } pthread_mutex_unlock(&queue_mutex); // 检查超时 if (timeout_ms > 0) { time_t elapsed = time(NULL) - start_time; if (elapsed * 1000 >= timeout_ms) { return -1; // 超时 } } usleep(10000); // 休眠 10ms } return -1; }
// 消息系统实现 static struct message_system available_systems[] = { { .name = "custom_queue", .send_func = custom_queue_send, .receive_func = custom_queue_receive, .init_func = NULL, .cleanup_func = NULL }, { .name = "rt_signal", .send_func = rt_signal_send, .receive_func = rt_signal_receive, .init_func = NULL, .cleanup_func = NULL } };
// 当前使用的消息系统 static struct message_system *current_system = &available_systems[0];
// 发送统一消息 int send_unified_message(int priority, const char *type, const void *data, size_t data_size) { struct unified_message msg; msg.priority = priority; msg.timestamp = time(NULL); msg.sender_pid = getpid(); strncpy(msg.type, type, sizeof(msg.type) - 1); msg.type[sizeof(msg.type) - 1] = '\0'; msg.data_size = (data_size < sizeof(msg.data)) ? data_size : sizeof(msg.data); if (data && data_size > 0) { memcpy(msg.data, data, msg.data_size); } if (current_system->send_func) { return current_system->send_func(&msg); } return -1; }
// 接收统一消息 int receive_unified_message(struct unified_message *msg, int timeout_ms) { if (current_system->receive_func) { return current_system->receive_func(msg, timeout_ms); } return -1; }
// 演示不同优先级消息处理 void demonstrate_priority_handling() { printf("=== 优先级消息处理演示 ===\n\n"); // 发送不同优先级的消息 printf("发送不同优先级的消息:\n"); const char *critical_msg = "系统紧急告警:磁盘空间不足"; send_unified_message(PRIORITY_CRITICAL, "ALERT", critical_msg, strlen(critical_msg)); printf(" ✓ 发送关键优先级消息 (优先级 %d)\n", PRIORITY_CRITICAL); const char *high_msg = "应用程序错误:数据库连接失败"; send_unified_message(PRIORITY_HIGH, "ERROR", high_msg, strlen(high_msg)); printf(" ✓ 发送高优先级消息 (优先级 %d)\n", PRIORITY_HIGH); const char *normal_msg = "用户登录成功"; send_unified_message(PRIORITY_NORMAL, "INFO", normal_msg, strlen(normal_msg)); printf(" ✓ 发送普通优先级消息 (优先级 %d)\n", PRIORITY_NORMAL); const char *low_msg = "系统日志:定时任务执行完成"; send_unified_message(PRIORITY_LOW, "DEBUG", low_msg, strlen(low_msg)); printf(" ✓ 发送低优先级消息 (优先级 %d)\n", PRIORITY_LOW); printf("\n接收消息 (按优先级顺序):\n"); // 接收并显示消息 struct unified_message received_msg; for (int i = 0; i < 4; i++) { if (receive_unified_message(&received_msg, 1000) == 0) { printf(" [%d] 优先级 %d (%s): %.*s\n", i + 1, received_msg.priority, received_msg.type, (int)received_msg.data_size, received_msg.data); } else { printf(" [%d] 超时或无消息\n", i + 1); } } }
int main() { printf("=== putpmsg 现代替代方案演示 ===\n\n"); printf("putpmsg 替代方案概述:\n"); printf("1. 实时信号 (RT signals)\n"); printf("2. Unix 域套接字\n"); printf("3. 管道和 FIFO\n"); printf("4. D-Bus 消息系统\n"); printf("5. 自定义优先级队列\n"); printf("6. POSIX 消息队列\n"); printf("7. epoll + 管道\n"); printf("\n"); // 演示优先级处理 demonstrate_priority_handling(); printf("\n=== 各方案特点对比 ===\n"); printf("方案 优先级支持 跨进程 复杂度 性能\n"); printf("------------- ---------- ------- ------ ----\n"); printf("实时信号 中等 是 低 高\n"); printf("Unix套接字 无 是 中 中\n"); printf("管道/FIFO 无 是 低 中\n"); printf("D-Bus 高 是 高 中\n"); printf("自定义队列 高 否 中 高\n"); printf("POSIX消息队列 高 是 中 高\n"); printf("epoll+管道 高 是 高 高\n"); printf("\n"); printf("=== 选择建议 ===\n"); printf("简单应用: 使用实时信号或管道\n"); printf("复杂系统: 使用 POSIX 消息队列\n"); printf("高性能: 使用 epoll + 管道\n"); printf("企业级: 使用 D-Bus\n"); printf("跨语言: 使用 D-Bus 或 Unix 套接字\n"); return 0; }
|