多线程 + io_uring
实现高效大文件写入(64MB数据块,2GB文件分割)
以下是完整的代码实现,使用 两个线程:
- 生产者线程:生成 64MB 数据块,放入队列。
- 消费者线程:从队列取出数据,通过
io_uring
异步写入文件,并在文件超过 2GB 时自动切分。
1. 完整代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
#include <sys/stat.h>
#include <stdatomic.h>
#define BLOCK_SIZE (64 * 1024 * 1024) // 64MB 数据块
#define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024) // 2GB 文件分割阈值
#define QUEUE_SIZE 8 // 队列容量(防止内存爆炸)
// 数据块结构
typedef struct {
char *data;
size_t size;
} DataBlock;
// 线程安全队列
typedef struct {
DataBlock blocks[QUEUE_SIZE];
atomic_int head, tail;
pthread_mutex_t mutex;
pthread_cond_t not_empty, not_full;
} BlockQueue;
// 全局队列
BlockQueue block_queue;
atomic_int file_counter = 0; // 文件计数器(用于切分)
atomic_ullong current_file_size = 0; // 当前文件大小
// 初始化队列
void init_queue(BlockQueue *q) {
q->head = q->tail = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->not_empty, NULL);
pthread_cond_init(&q->not_full, NULL);
}
// 生产者:生成随机数据并放入队列
void *producer_thread(void *arg) {
while (1) {
DataBlock block;
block.data = malloc(BLOCK_SIZE);
if (!block.data) {
perror("malloc");
exit(EXIT_FAILURE);
}
block.size = BLOCK_SIZE;
// 填充随机数据
for (size_t i = 0; i < BLOCK_SIZE; i++) {
block.data[i] = rand() % 256;
}
// 放入队列
pthread_mutex_lock(&block_queue.mutex);
while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) {
pthread_cond_wait(&block_queue.not_full, &block_queue.mutex);
}
block_queue.blocks[block_queue.tail] = block;
block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE;
pthread_cond_signal(&block_queue.not_empty);
pthread_mutex_unlock(&block_queue.mutex);
}
return NULL;
}
// 消费者:从队列取出数据,用 io_uring 写入文件
void *consumer_thread(void *arg) {
struct io_uring ring;
int fd = -1;
char filename[256];
// 初始化 io_uring
if (io_uring_queue_init(8, &ring, 0) < 0) {
perror("io_uring_queue_init");
exit(EXIT_FAILURE);
}
while (1) {
DataBlock block;
// 从队列取出数据
pthread_mutex_lock(&block_queue.mutex);
while (block_queue.head == block_queue.tail) {
pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex);
}
block = block_queue.blocks[block_queue.head];
block_queue.head = (block_queue.head + 1) % QUEUE_SIZE;
pthread_cond_signal(&block_queue.not_full);
pthread_mutex_unlock(&block_queue.mutex);
// 检查是否需要切分文件
if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) {
if (fd != -1) close(fd);
snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++);
fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
perror("open");
exit(EXIT_FAILURE);
}
current_file_size = 0;
printf("Created new file: %s\n", filename);
}
// 提交异步写入请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size);
io_uring_sqe_set_data(sqe, block.data); // 关联数据块(用于释放)
io_uring_submit(&ring);
// 等待写入完成
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
exit(EXIT_FAILURE);
}
if (cqe->res < 0) {
fprintf(stderr, "Write error: %s\n", strerror(-cqe->res));
exit(EXIT_FAILURE);
}
// 更新文件大小并释放内存
current_file_size += cqe->res;
io_uring_cqe_seen(&ring, cqe);
free(block.data);
}
io_uring_queue_exit(&ring);
return NULL;
}
int main() {
pthread_t producer, consumer;
// 初始化队列
init_queue(&block_queue);
// 启动生产者线程
if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
// 启动消费者线程
if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {
perror("pthread_create");
exit(EXIT_FAILURE);
}
// 等待线程结束(实际上不会结束)
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
return 0;
}
2. 关键设计
(1)线程分工
线程 | 任务 |
---|---|
生产者线程 | 生成 64MB 随机数据,放入队列 |
消费者线程 | 从队列取出数据,用 io_uring 异步写入文件 |
(2)线程安全队列
- 环形缓冲区(
BlockQueue
)避免频繁malloc/free
。 - 互斥锁(
pthread_mutex_t
) + 条件变量(pthread_cond_t
) 保证线程安全:not_empty
:队列非空时唤醒消费者。not_full
:队列未满时唤醒生产者。
(3)文件切分逻辑
-
current_file_size
记录当前文件大小。 - 超过 2GB 时:
- 关闭当前文件。
- 创建新文件(
large_file_0.bin
,large_file_1.bin
, …)。
(4)io_uring
优化
- 批量化提交:可调整
QUEUE_SIZE
提高并发。 - 内存对齐:建议
posix_memalign
分配内存(减少内核拷贝)。 - 轮询模式:可启用
IORING_SETUP_SQPOLL
减少系统调用(需root)。
3. 编译与运行
gcc -o io_uring_multi_thread io_uring_multi_thread.c -luring -lpthread
./io_uring_multi_thread
输出示例:
Created new file: large_file_0.bin
Created new file: large_file_1.bin
...
4. 验证结果
ls -lh large_file_*.bin # 检查文件大小
md5sum large_file_*.bin # 验证数据完整性
5. 性能优化建议
优化点 | 说明 |
---|---|
内存池 | 预分配多个 64MB 块,减少 malloc/free 开销 |
批量提交 | 一次提交多个 io_uring 请求(提高吞吐) |
O_DIRECT | 直接 I/O 绕过页缓存(需内存对齐) |
多消费者线程 | 多个消费者线程并行处理队列 |
6. 总结
- 生产者-消费者模型:解耦数据生成和写入,提高并行度。
-
io_uring
异步 I/O:最大化磁盘写入性能。 - 自动文件切分:避免单个文件过大(2GB 限制)。
适用于 日志系统、数据库、大数据存储 等场景。 🚀