1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
| #include <sys/sem.h> #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <time.h>
#define BUFFER_SIZE 5 #define MAX_ITEMS 10
union semun { int val; struct semid_ds *buf; unsigned short *array; };
// 信号量索引定义 #define EMPTY_SEM 0 // 空槽位计数 #define FULL_SEM 1 // 满槽位计数 #define MUTEX_SEM 2 // 缓冲区互斥锁
/** * 初始化生产者-消费者信号量 */ int init_pc_semaphores(key_t key) { int semid; union semun arg; unsigned short values[3] = {BUFFER_SIZE, 0, 1}; // empty, full, mutex // 创建包含3个信号量的信号量集 semid = semget(key, 3, IPC_CREAT | 0666); if (semid == -1) { perror("创建信号量集失败"); return -1; } // 初始化信号量值 arg.array = values; if (semctl(semid, 0, SETALL, arg) == -1) { perror("初始化信号量失败"); semctl(semid, 0, IPC_RMID); return -1; } return semid; }
/** * 生产者操作 */ int producer_operation(int semid, int item, int timeout_sec) { struct sembuf sops[2]; struct timespec timeout; // 设置超时时间 timeout.tv_sec = timeout_sec; timeout.tv_nsec = 0; // 1. 等待空槽位 (P(empty)) sops[0].sem_num = EMPTY_SEM; sops[0].sem_op = -1; sops[0].sem_flg = 0; // 2. 获取缓冲区互斥锁 (P(mutex)) sops[1].sem_num = MUTEX_SEM; sops[1].sem_op = -1; sops[1].sem_flg = 0; printf("生产者: 尝试生产项目 %d\n", item); // 执行P操作 if (semtimedop(semid, sops, 2, &timeout) == -1) { if (errno == EAGAIN) { printf("生产者: 等待超时,无法生产项目 %d\n", item); } else { printf("生产者: 操作失败: %s\n", strerror(errno)); } return -1; } // 模拟生产过程 printf("生产者: 正在生产项目 %d\n", item); sleep(1); printf("生产者: 生产完成项目 %d\n", item); // 释放互斥锁和增加满槽位计数 sops[0].sem_num = MUTEX_SEM; sops[0].sem_op = 1; // V(mutex) sops[1].sem_num = FULL_SEM; sops[1].sem_op = 1; // V(full) if (semop(semid, sops, 2) == -1) { perror("生产者: 释放信号量失败"); return -1; } return 0; }
/** * 消费者操作 */ int consumer_operation(int semid, int *item, int timeout_sec) { struct sembuf sops[2]; struct timespec timeout; // 设置超时时间 timeout.tv_sec = timeout_sec; timeout.tv_nsec = 0; // 1. 等待满槽位 (P(full)) sops[0].sem_num = FULL_SEM; sops[0].sem_op = -1; sops[0].sem_flg = 0; // 2. 获取缓冲区互斥锁 (P(mutex)) sops[1].sem_num = MUTEX_SEM; sops[1].sem_op = -1; sops[1].sem_flg = 0; printf("消费者: 尝试消费项目\n"); // 执行P操作 if (semtimedop(semid, sops, 2, &timeout) == -1) { if (errno == EAGAIN) { printf("消费者: 等待超时,无项目可消费\n"); } else { printf("消费者: 操作失败: %s\n", strerror(errno)); } return -1; } // 模拟消费过程 *item = rand() % 100; // 模拟消费的项目 printf("消费者: 正在消费项目 %d\n", *item); sleep(1); printf("消费者: 消费完成项目 %d\n", *item); // 释放互斥锁和增加空槽位计数 sops[0].sem_num = MUTEX_SEM; sops[0].sem_op = 1; // V(mutex) sops[1].sem_num = EMPTY_SEM; sops[1].sem_op = 1; // V(empty) if (semop(semid, sops, 2) == -1) { perror("消费者: 释放信号量失败"); return -1; } return 0; }
/** * 演示生产者-消费者模型 */ int demo_producer_consumer() { int semid; key_t key = ftok(".", 'p'); printf("=== 生产者-消费者模型演示 ===\n"); printf("缓冲区大小: %d\n", BUFFER_SIZE); printf("生产/消费项目数: %d\n", MAX_ITEMS); // 初始化信号量 semid = init_pc_semaphores(key); if (semid == -1) { return -1; } printf("创建生产者-消费者信号量集,ID: %d\n", semid); // 创建生产者进程 if (fork() == 0) { // 生产者进程 srand(getpid()); printf("生产者进程启动 (PID: %d)\n", getpid()); for (int i = 1; i <= MAX_ITEMS; i++) { if (producer_operation(semid, i, 5) == -1) { printf("生产者: 生产项目 %d 失败\n", i); break; } sleep(1); // 生产间隔 } printf("生产者进程结束\n"); exit(0); } // 创建消费者进程 if (fork() == 0) { // 消费者进程 srand(getpid() + 1000); printf("消费者进程启动 (PID: %d)\n", getpid()); for (int i = 1; i <= MAX_ITEMS; i++) { int item; if (consumer_operation(semid, &item, 5) == -1) { printf("消费者: 消费项目失败\n"); break; } sleep(2); // 消费间隔 } printf("消费者进程结束\n"); exit(0); } // 父进程等待子进程完成 int status; wait(&status); wait(&status); // 显示最终信号量状态 union semun arg; unsigned short values[3]; arg.array = values; if (semctl(semid, 0, GETALL, arg) != -1) { printf("\n最终信号量状态:\n"); printf(" 空槽位: %d\n", values[EMPTY_SEM]); printf(" 满槽位: %d\n", values[FULL_SEM]); printf(" 互斥锁: %d\n", values[MUTEX_SEM]); } // 清理信号量集 semctl(semid, 0, IPC_RMID); return 0; }
int main() { return demo_producer_consumer(); }
|