多线程 + io_uring 实现高效大文件写入

多线程 + io_uring 实现高效大文件写入(64MB数据块,2GB文件分割)​​

以下是完整的代码实现,使用 ​​两个线程​​:

  1. ​生产者线程​​:生成 ​​64MB 数据块​​,放入队列。
  2. ​消费者线程​​:从队列取出数据,通过 io_uring​异步写入文件​​,并在文件超过 ​​2GB 时自动切分​​。

​1. 完整代码​

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
#include <sys/stat.h>
#include <stdatomic.h>

#define BLOCK_SIZE (64 * 1024 * 1024)  // 64MB 数据块
#define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024)  // 2GB 文件分割阈值
#define QUEUE_SIZE 8  // 队列容量(防止内存爆炸)

// 数据块结构
typedef struct {
    char *data;
    size_t size;
} DataBlock;

// 线程安全队列
typedef struct {
    DataBlock blocks[QUEUE_SIZE];
    atomic_int head, tail;
    pthread_mutex_t mutex;
    pthread_cond_t not_empty, not_full;
} BlockQueue;

// 全局队列
BlockQueue block_queue;
atomic_int file_counter = 0;  // 文件计数器(用于切分)
atomic_ullong current_file_size = 0;  // 当前文件大小

// 初始化队列
void init_queue(BlockQueue *q) {
    q->head = q->tail = 0;
    pthread_mutex_init(&q->mutex, NULL);
    pthread_cond_init(&q->not_empty, NULL);
    pthread_cond_init(&q->not_full, NULL);
}

// 生产者:生成随机数据并放入队列
void *producer_thread(void *arg) {
    while (1) {
        DataBlock block;
        block.data = malloc(BLOCK_SIZE);
        if (!block.data) {
            perror("malloc");
            exit(EXIT_FAILURE);
        }
        block.size = BLOCK_SIZE;

        // 填充随机数据
        for (size_t i = 0; i < BLOCK_SIZE; i++) {
            block.data[i] = rand() % 256;
        }

        // 放入队列
        pthread_mutex_lock(&block_queue.mutex);
        while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) {
            pthread_cond_wait(&block_queue.not_full, &block_queue.mutex);
        }
        block_queue.blocks[block_queue.tail] = block;
        block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE;
        pthread_cond_signal(&block_queue.not_empty);
        pthread_mutex_unlock(&block_queue.mutex);
    }
    return NULL;
}

// 消费者:从队列取出数据,用 io_uring 写入文件
void *consumer_thread(void *arg) {
    struct io_uring ring;
    int fd = -1;
    char filename[256];

    // 初始化 io_uring
    if (io_uring_queue_init(8, &ring, 0) < 0) {
        perror("io_uring_queue_init");
        exit(EXIT_FAILURE);
    }

    while (1) {
        DataBlock block;

        // 从队列取出数据
        pthread_mutex_lock(&block_queue.mutex);
        while (block_queue.head == block_queue.tail) {
            pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex);
        }
        block = block_queue.blocks[block_queue.head];
        block_queue.head = (block_queue.head + 1) % QUEUE_SIZE;
        pthread_cond_signal(&block_queue.not_full);
        pthread_mutex_unlock(&block_queue.mutex);

        // 检查是否需要切分文件
        if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) {
            if (fd != -1) close(fd);
            snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++);
            fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);
            if (fd < 0) {
                perror("open");
                exit(EXIT_FAILURE);
            }
            current_file_size = 0;
            printf("Created new file: %s\n", filename);
        }

        // 提交异步写入请求
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size);
        io_uring_sqe_set_data(sqe, block.data);  // 关联数据块(用于释放)
        io_uring_submit(&ring);

        // 等待写入完成
        struct io_uring_cqe *cqe;
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            perror("io_uring_wait_cqe");
            exit(EXIT_FAILURE);
        }
        if (cqe->res < 0) {
            fprintf(stderr, "Write error: %s\n", strerror(-cqe->res));
            exit(EXIT_FAILURE);
        }

        // 更新文件大小并释放内存
        current_file_size += cqe->res;
        io_uring_cqe_seen(&ring, cqe);
        free(block.data);
    }

    io_uring_queue_exit(&ring);
    return NULL;
}

int main() {
    pthread_t producer, consumer;

    // 初始化队列
    init_queue(&block_queue);

    // 启动生产者线程
    if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) {
        perror("pthread_create");
        exit(EXIT_FAILURE);
    }

    // 启动消费者线程
    if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {
        perror("pthread_create");
        exit(EXIT_FAILURE);
    }

    // 等待线程结束(实际上不会结束)
    pthread_join(producer, NULL);
    pthread_join(consumer, NULL);

    return 0;
}

​2. 关键设计​

​(1)线程分工​

线程任务
​生产者线程​生成 ​​64MB 随机数据​​,放入队列
​消费者线程​从队列取出数据,用 io_uring​异步写入文件​

​(2)线程安全队列​

  • ​环形缓冲区​​(BlockQueue)避免频繁 malloc/free
  • ​互斥锁(pthread_mutex_t)​​ + ​​条件变量(pthread_cond_t)​​ 保证线程安全:
    • not_empty:队列非空时唤醒消费者。
    • not_full:队列未满时唤醒生产者。

​(3)文件切分逻辑​

  • current_file_size​ 记录当前文件大小。
  • ​超过 2GB 时​​:
    • 关闭当前文件。
    • 创建新文件(large_file_0.bin, large_file_1.bin, …)。

​(4)io_uring 优化​

  • ​批量化提交​​:可调整 QUEUE_SIZE 提高并发。
  • ​内存对齐​​:建议 posix_memalign 分配内存(减少内核拷贝)。
  • ​轮询模式​​:可启用 IORING_SETUP_SQPOLL 减少系统调用(需root)。

​3. 编译与运行​

gcc -o io_uring_multi_thread io_uring_multi_thread.c -luring -lpthread
./io_uring_multi_thread

​输出示例​​:

Created new file: large_file_0.bin
Created new file: large_file_1.bin
...

​4. 验证结果​

ls -lh large_file_*.bin  # 检查文件大小
md5sum large_file_*.bin  # 验证数据完整性

​5. 性能优化建议​

优化点说明
​内存池​预分配多个 64MB 块,减少 malloc/free 开销
​批量提交​一次提交多个 io_uring 请求(提高吞吐)
​O_DIRECT​直接 I/O 绕过页缓存(需内存对齐)
​多消费者线程​多个消费者线程并行处理队列

​6. 总结​

  • ​生产者-消费者模型​​:解耦数据生成和写入,提高并行度。
  • io_uring 异步 I/O​​:最大化磁盘写入性能。
  • ​自动文件切分​​:避免单个文件过大(2GB 限制)。

适用于 ​​日志系统、数据库、大数据存储​​ 等场景。 🚀

此条目发表在未分类分类目录。将固定链接加入收藏夹。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注