1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| #include <stdio.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <unistd.h> #include <pthread.h> #include <liburing.h> #include <sys/stat.h> #include <stdatomic.h>
#define BLOCK_SIZE (64 * 1024 * 1024) // 64MB 数据块 #define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024) // 2GB 文件分割阈值 #define QUEUE_SIZE 8 // 队列容量(防止内存爆炸)
// 数据块结构 typedef struct { char *data; size_t size; } DataBlock;
// 线程安全队列 typedef struct { DataBlock blocks[QUEUE_SIZE]; atomic_int head, tail; pthread_mutex_t mutex; pthread_cond_t not_empty, not_full; } BlockQueue;
// 全局队列 BlockQueue block_queue; atomic_int file_counter = 0; // 文件计数器(用于切分) atomic_ullong current_file_size = 0; // 当前文件大小
// 初始化队列 void init_queue(BlockQueue *q) { q->head = q->tail = 0; pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->not_empty, NULL); pthread_cond_init(&q->not_full, NULL); }
// 生产者:生成随机数据并放入队列 void *producer_thread(void *arg) { while (1) { DataBlock block; block.data = malloc(BLOCK_SIZE); if (!block.data) { perror("malloc"); exit(EXIT_FAILURE); } block.size = BLOCK_SIZE;
// 填充随机数据 for (size_t i = 0; i < BLOCK_SIZE; i++) { block.data[i] = rand() % 256; }
// 放入队列 pthread_mutex_lock(&block_queue.mutex); while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) { pthread_cond_wait(&block_queue.not_full, &block_queue.mutex); } block_queue.blocks[block_queue.tail] = block; block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE; pthread_cond_signal(&block_queue.not_empty); pthread_mutex_unlock(&block_queue.mutex); } return NULL; }
// 消费者:从队列取出数据,用 io_uring 写入文件 void *consumer_thread(void *arg) { struct io_uring ring; int fd = -1; char filename[256];
// 初始化 io_uring if (io_uring_queue_init(8, &ring, 0) < 0) { perror("io_uring_queue_init"); exit(EXIT_FAILURE); }
while (1) { DataBlock block;
// 从队列取出数据 pthread_mutex_lock(&block_queue.mutex); while (block_queue.head == block_queue.tail) { pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex); } block = block_queue.blocks[block_queue.head]; block_queue.head = (block_queue.head + 1) % QUEUE_SIZE; pthread_cond_signal(&block_queue.not_full); pthread_mutex_unlock(&block_queue.mutex);
// 检查是否需要切分文件 if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) { if (fd != -1) close(fd); snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++); fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644); if (fd < 0) { perror("open"); exit(EXIT_FAILURE); } current_file_size = 0; printf("Created new file: %s\n", filename); }
// 提交异步写入请求 struct io_uring_sqe *sqe = io_uring_get_sqe(&ring); io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size); io_uring_sqe_set_data(sqe, block.data); // 关联数据块(用于释放) io_uring_submit(&ring);
// 等待写入完成 struct io_uring_cqe *cqe; int ret = io_uring_wait_cqe(&ring, &cqe); if (ret < 0) { perror("io_uring_wait_cqe"); exit(EXIT_FAILURE); } if (cqe->res < 0) { fprintf(stderr, "Write error: %s\n", strerror(-cqe->res)); exit(EXIT_FAILURE); }
// 更新文件大小并释放内存 current_file_size += cqe->res; io_uring_cqe_seen(&ring, cqe); free(block.data); }
io_uring_queue_exit(&ring); return NULL; }
int main() { pthread_t producer, consumer;
// 初始化队列 init_queue(&block_queue);
// 启动生产者线程 if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) { perror("pthread_create"); exit(EXIT_FAILURE); }
// 启动消费者线程 if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) { perror("pthread_create"); exit(EXIT_FAILURE); }
// 等待线程结束(实际上不会结束) pthread_join(producer, NULL); pthread_join(consumer, NULL);
return 0; }
|