writev 函数详解
1. 函数介绍
writev
是Linux系统调用,用于向文件描述符写入多个分散的缓冲区数据(scatter-gather I/O)。它是 write
函数的增强版本,允许一次系统调用写入多个不连续的内存区域,减少了系统调用的开销,提高了I/O性能。
2. 函数原型
#include <sys/uio.h>
ssize_t writev(int fd, const struct iovec *iov, int iovcnt);
3. 功能
writev
将多个分散的缓冲区数据原子性地写入到指定的文件描述符中。它使用分散/聚集I/O(scatter-gather I/O)机制,可以显著减少系统调用次数,提高大量小数据块写入的性能。
4. 参数
- int fd: 目标文件描述符
- *const struct iovec iov: iovec结构体数组,描述多个缓冲区
- int iovcnt: iovec数组中的元素个数
5. 返回值
- 成功: 返回实际写入的字节数
- 失败: 返回-1,并设置errno
6. 相似函数,或关联函数
- readv: 对应的读取函数
- write: 基本写入函数
- sendmsg/recvmsg: 网络套接字的分散/聚集I/O
- preadv/pwritev: 带偏移量的分散/聚集I/O
7. 示例代码
示例1:基础writev使用
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
/**
* 演示基础writev使用方法
*/
int demo_writev_basic() {
const char *header = "HTTP/1.1 200 OK\r\n";
const char *content_type = "Content-Type: text/html\r\n";
const char *content_length = "Content-Length: 25\r\n";
const char *connection = "Connection: close\r\n";
const char *blank_line = "\r\n";
const char *body = "<html><body>Hello</body></html>";
struct iovec iov[6];
int fd;
ssize_t bytes_written;
printf("=== 基础writev使用示例 ===\n");
// 准备iovec数组
iov[0].iov_base = (void*)header;
iov[0].iov_len = strlen(header);
iov[1].iov_base = (void*)content_type;
iov[1].iov_len = strlen(content_type);
iov[2].iov_base = (void*)content_length;
iov[2].iov_len = strlen(content_length);
iov[3].iov_base = (void*)connection;
iov[3].iov_len = strlen(connection);
iov[4].iov_base = (void*)blank_line;
iov[4].iov_len = strlen(blank_line);
iov[5].iov_base = (void*)body;
iov[5].iov_len = strlen(body);
// 显示要写入的数据
printf("准备写入的数据:\n");
for (int i = 0; i < 6; i++) {
printf(" 缓冲区 %d: %.*s", i + 1, (int)iov[i].iov_len, (char*)iov[i].iov_base);
// 如果不是以换行符结尾,添加换行符
if (iov[i].iov_len > 0 && ((char*)iov[i].iov_base)[iov[i].iov_len - 1] != '\n') {
printf("\n");
}
}
printf("\n总数据长度: %zu 字节\n",
iov[0].iov_len + iov[1].iov_len + iov[2].iov_len +
iov[3].iov_len + iov[4].iov_len + iov[5].iov_len);
// 写入到标准输出(演示用途)
printf("\n1. 使用writev写入到标准输出:\n");
bytes_written = writev(STDOUT_FILENO, iov, 6);
if (bytes_written == -1) {
perror("writev 失败");
return -1;
}
printf(" 成功写入 %zd 字节\n", bytes_written);
// 写入到文件
printf("\n2. 使用writev写入到文件:\n");
fd = open("writev_output.txt", O_CREAT | O_WRONLY | O_TRUNC, 0644);
if (fd == -1) {
perror("创建文件失败");
return -1;
}
bytes_written = writev(fd, iov, 6);
if (bytes_written == -1) {
perror("writev 写入文件失败");
close(fd);
return -1;
}
printf(" 成功写入文件 %zd 字节\n", bytes_written);
close(fd);
// 验证写入结果
printf("\n3. 验证写入结果:\n");
fd = open("writev_output.txt", O_RDONLY);
if (fd != -1) {
char buffer[1024];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 文件内容:\n%s", buffer);
}
close(fd);
unlink("writev_output.txt"); // 清理测试文件
}
return 0;
}
int main() {
return demo_writev_basic();
}
示例2:性能对比测试
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/time.h>
/**
* 性能测试结构
*/
typedef struct {
const char *name;
double writev_time;
double write_time;
ssize_t total_bytes;
int operation_count;
} performance_test_t;
/**
* 使用writev进行批量写入
*/
ssize_t writev_bulk_write(int fd, const char **messages, int count) {
struct iovec *iov = malloc(count * sizeof(struct iovec));
if (!iov) {
return -1;
}
ssize_t total_written = 0;
// 准备iovec数组
for (int i = 0; i < count; i++) {
iov[i].iov_base = (void*)messages[i];
iov[i].iov_len = strlen(messages[i]);
}
// 执行writev写入
ssize_t result = writev(fd, iov, count);
if (result != -1) {
total_written = result;
}
free(iov);
return total_written;
}
/**
* 使用多次write进行批量写入
*/
ssize_t write_bulk_write(int fd, const char **messages, int count) {
ssize_t total_written = 0;
for (int i = 0; i < count; i++) {
ssize_t result = write(fd, messages[i], strlen(messages[i]));
if (result == -1) {
return -1;
}
total_written += result;
}
return total_written;
}
/**
* 获取当前时间(微秒)
*/
long long get_current_time_us() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec * 1000000LL + tv.tv_usec;
}
/**
* 演示writev性能对比
*/
int demo_writev_performance_comparison() {
const int message_count = 1000;
const char *test_messages[1000];
int fd_writev, fd_write;
performance_test_t test_results;
printf("=== writev vs write 性能对比测试 ===\n");
// 准备测试消息
printf("1. 准备测试数据:\n");
char **message_buffers = malloc(message_count * sizeof(char*));
if (!message_buffers) {
perror("分配消息缓冲区失败");
return -1;
}
for (int i = 0; i < message_count; i++) {
message_buffers[i] = malloc(64);
if (message_buffers[i]) {
snprintf(message_buffers[i], 64, "Test message %d: Hello World!\n", i + 1);
test_messages[i] = message_buffers[i];
} else {
printf("分配消息 %d 失败\n", i);
// 清理已分配的缓冲区
for (int j = 0; j < i; j++) {
free(message_buffers[j]);
}
free(message_buffers);
return -1;
}
}
printf(" 准备了 %d 条测试消息\n", message_count);
// 计算总数据量
size_t total_data_size = 0;
for (int i = 0; i < message_count; i++) {
total_data_size += strlen(test_messages[i]);
}
printf(" 总数据量: %zu 字节 (%.2f KB)\n", total_data_size, total_data_size / 1024.0);
// 创建测试文件
printf("\n2. 创建测试文件:\n");
fd_writev = open("writev_test.txt", O_CREAT | O_WRONLY | O_TRUNC, 0644);
fd_write = open("write_test.txt", O_CREAT | O_WRONLY | O_TRUNC, 0644);
if (fd_writev == -1 || fd_write == -1) {
perror("创建测试文件失败");
if (fd_writev != -1) close(fd_writev);
if (fd_write != -1) close(fd_write);
// 清理消息缓冲区
for (int i = 0; i < message_count; i++) {
free(message_buffers[i]);
}
free(message_buffers);
return -1;
}
printf(" writev测试文件: writev_test.txt\n");
printf(" write测试文件: write_test.txt\n");
// writev性能测试
printf("\n3. writev性能测试:\n");
long long start_time = get_current_time_us();
ssize_t writev_bytes = writev_bulk_write(fd_writev, test_messages, message_count);
if (writev_bytes == -1) {
perror("writev测试失败");
close(fd_writev);
close(fd_write);
unlink("writev_test.txt");
unlink("write_test.txt");
// 清理消息缓冲区
for (int i = 0; i < message_count; i++) {
free(message_buffers[i]);
}
free(message_buffers);
return -1;
}
long long end_time = get_current_time_us();
double writev_time = (end_time - start_time) / 1000.0; // 转换为毫秒
printf(" writev写入字节数: %zd\n", writev_bytes);
printf(" writev耗时: %.3f 毫秒\n", writev_time);
close(fd_writev);
// write性能测试
printf("\n4. write性能测试:\n");
start_time = get_current_time_us();
ssize_t write_bytes = write_bulk_write(fd_write, test_messages, message_count);
if (write_bytes == -1) {
perror("write测试失败");
close(fd_write);
unlink("writev_test.txt");
unlink("write_test.txt");
// 清理消息缓冲区
for (int i = 0; i < message_count; i++) {
free(message_buffers[i]);
}
free(message_buffers);
return -1;
}
end_time = get_current_time_us();
double write_time = (end_time - start_time) / 1000.0; // 转换为毫秒
printf(" write写入字节数: %zd\n", write_bytes);
printf(" write耗时: %.3f 毫秒\n", write_time);
close(fd_write);
// 显示性能对比结果
printf("\n5. 性能对比结果:\n");
printf(" 数据总量: %zu 字节\n", total_data_size);
printf(" writev操作: %d 次系统调用\n", 1);
printf(" write操作: %d 次系统调用\n", message_count);
printf(" writev耗时: %.3f 毫秒\n", writev_time);
printf(" write耗时: %.3f 毫秒\n", write_time);
if (write_time > 0 && writev_time > 0) {
double speedup = write_time / writev_time;
double reduction = (write_time - writev_time) / write_time * 100;
printf(" 性能提升: %.2f 倍\n", speedup);
printf(" 时间减少: %.1f%%\n", reduction);
}
// 验证数据一致性
printf("\n6. 数据一致性验证:\n");
if (writev_bytes == write_bytes) {
printf(" ✓ 写入字节数一致\n");
} else {
printf(" ✗ 写入字节数不一致 (writev: %zd, write: %zd)\n", writev_bytes, write_bytes);
}
// 清理测试文件
unlink("writev_test.txt");
unlink("write_test.txt");
// 清理消息缓冲区
for (int i = 0; i < message_count; i++) {
free(message_buffers[i]);
}
free(message_buffers);
// 显示性能分析
printf("\n=== 性能分析 ===\n");
printf("1. 系统调用开销:\n");
printf(" writev减少了 %d 次系统调用\n", message_count - 1);
printf(" 每次系统调用节省约 %.3f 微秒\n",
((write_time - writev_time) * 1000) / (message_count - 1));
printf("\n2. 适用场景:\n");
printf(" ✓ 大量小数据块写入\n");
printf(" ✓ 网络协议数据组装\n");
printf(" ✓ 日志文件批量写入\n");
printf(" ✓ 数据库事务日志\n");
printf("\n3. 性能优化建议:\n");
printf(" ✓ 合理设置iovec数组大小\n");
printf(" ✓ 避免过于频繁的writev调用\n");
printf(" ✓ 考虑使用异步I/O\n");
printf(" ✓ 监控系统调用性能\n");
return 0;
}
int main() {
return demo_writev_performance_comparison();
}
示例3:网络协议数据组装
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
/**
* HTTP响应结构
*/
typedef struct {
int status_code;
const char *status_text;
const char *content_type;
size_t content_length;
const char *headers[16];
int header_count;
} http_response_t;
/**
* 创建HTTP响应头部
*/
int create_http_headers(http_response_t *response, struct iovec *iov, int max_iov) {
int iov_index = 0;
// 状态行
char *status_line = malloc(64);
if (!status_line) return -1;
snprintf(status_line, 64, "HTTP/1.1 %d %s\r\n",
response->status_code, response->status_text);
iov[iov_index].iov_base = status_line;
iov[iov_index].iov_len = strlen(status_line);
iov_index++;
// Content-Type
char *content_type_header = malloc(64);
if (!content_type_header) {
free(status_line);
return -1;
}
snprintf(content_type_header, 64, "Content-Type: %s\r\n", response->content_type);
iov[iov_index].iov_base = content_type_header;
iov[iov_index].iov_len = strlen(content_type_header);
iov_index++;
// Content-Length
char *content_length_header = malloc(64);
if (!content_length_header) {
free(status_line);
free(content_type_header);
return -1;
}
snprintf(content_length_header, 64, "Content-Length: %zu\r\n", response->content_length);
iov[iov_index].iov_base = content_length_header;
iov[iov_index].iov_len = strlen(content_length_header);
iov_index++;
// 自定义头部
for (int i = 0; i < response->header_count && iov_index < max_iov - 2; i++) {
char *custom_header = strdup(response->headers[i]);
if (custom_header) {
iov[iov_index].iov_base = custom_header;
iov[iov_index].iov_len = strlen(custom_header);
iov_index++;
}
}
// 空行分隔符
char *blank_line = malloc(3);
if (!blank_line) {
// 清理已分配的内存
for (int i = 0; i < iov_index; i++) {
free(iov[i].iov_base);
}
return -1;
}
strcpy(blank_line, "\r\n");
iov[iov_index].iov_base = blank_line;
iov[iov_index].iov_len = 2;
iov_index++;
return iov_index;
}
/**
* 演示网络协议数据组装
*/
int demo_network_protocol_assembly() {
http_response_t response = {0};
struct iovec iov[32];
int iov_count;
int fd;
printf("=== 网络协议数据组装演示 ===\n");
// 初始化HTTP响应
printf("1. 初始化HTTP响应:\n");
response.status_code = 200;
response.status_text = "OK";
response.content_type = "application/json";
response.content_length = 45;
response.header_count = 2;
response.headers[0] = "Server: MyWebServer/1.0";
response.headers[1] = "Cache-Control: no-cache";
printf(" 状态码: %d %s\n", response.status_code, response.status_text);
printf(" 内容类型: %s\n", response.content_type);
printf(" 内容长度: %zu 字节\n", response.content_length);
printf(" 自定义头部: %d 个\n", response.header_count);
for (int i = 0; i < response.header_count; i++) {
printf(" %s\n", response.headers[i]);
}
// 创建响应内容
const char *content = "{\"message\":\"Hello World\",\"status\":\"success\"}";
// 创建HTTP头部
printf("\n2. 创建HTTP头部:\n");
iov_count = create_http_headers(&response, iov, 32);
if (iov_count == -1) {
printf("创建HTTP头部失败\n");
return -1;
}
printf(" 创建了 %d 个头部片段\n", iov_count);
// 添加响应内容
printf("\n3. 添加响应内容:\n");
if (iov_count < 32) {
iov[iov_count].iov_base = (void*)content;
iov[iov_count].iov_len = strlen(content);
iov_count++;
printf(" 添加响应内容: %s\n", content);
}
// 显示完整的HTTP响应
printf("\n4. 完整HTTP响应:\n");
for (int i = 0; i < iov_count; i++) {
printf("%.*s", (int)iov[i].iov_len, (char*)iov[i].iov_base);
}
// 计算总长度
size_t total_length = 0;
for (int i = 0; i < iov_count; i++) {
total_length += iov[i].iov_len;
}
printf("\n总响应长度: %zu 字节\n", total_length);
// 写入到文件(模拟网络发送)
printf("\n5. 写入到文件(模拟网络发送):\n");
fd = open("http_response.txt", O_CREAT | O_WRONLY | O_TRUNC, 0644);
if (fd == -1) {
perror("创建响应文件失败");
// 清理内存
for (int i = 0; i < iov_count; i++) {
free(iov[i].iov_base);
}
return -1;
}
ssize_t bytes_written = writev(fd, iov, iov_count);
if (bytes_written == -1) {
perror("writev 写入失败");
close(fd);
unlink("http_response.txt");
// 清理内存
for (int i = 0; i < iov_count; i++) {
free(iov[i].iov_base);
}
return -1;
}
printf(" 成功写入 %zd 字节到文件\n", bytes_written);
close(fd);
// 验证写入结果
printf("\n6. 验证写入结果:\n");
fd = open("http_response.txt", O_RDONLY);
if (fd != -1) {
char buffer[512];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 文件内容 (%zd 字节):\n", bytes_read);
printf("%s", buffer);
}
close(fd);
unlink("http_response.txt");
}
// 清理内存
for (int i = 0; i < iov_count; i++) {
free(iov[i].iov_base);
}
// 显示协议组装优势
printf("\n=== 协议组装优势 ===\n");
printf("1. 零拷贝组装:\n");
printf(" ✓ 避免数据拷贝操作\n");
printf(" ✓ 减少内存使用\n");
printf(" ✓ 提高组装效率\n");
printf("\n2. 原子性保证:\n");
printf(" ✓ 单次系统调用完成\n");
printf(" ✓ 数据完整性保证\n");
printf(" ✓ 避免部分写入问题\n");
printf("\n3. 灵活性:\n");
printf(" ✓ 动态头部组装\n");
printf(" ✓ 可变内容长度\n");
printf(" ✓ 复杂协议支持\n");
return 0;
}
int main() {
return demo_network_protocol_assembly();
}
示例4:日志系统优化
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <pthread.h>
/**
* 日志条目结构
*/
typedef struct {
time_t timestamp;
const char *level;
const char *module;
const char *message;
pid_t pid;
pthread_t tid;
} log_entry_t;
/**
* 日志缓冲区管理器
*/
typedef struct {
struct iovec *iov;
int capacity;
int count;
size_t total_size;
int fd;
} log_buffer_t;
/**
* 初始化日志缓冲区
*/
int init_log_buffer(log_buffer_t *buffer, int capacity, const char *filename) {
buffer->iov = malloc(capacity * sizeof(struct iovec));
if (!buffer->iov) {
return -1;
}
buffer->capacity = capacity;
buffer->count = 0;
buffer->total_size = 0;
// 打开日志文件
buffer->fd = open(filename, O_CREAT | O_WRONLY | O_APPEND, 0644);
if (buffer->fd == -1) {
free(buffer->iov);
return -1;
}
printf("日志缓冲区初始化完成:\n");
printf(" 缓冲区容量: %d 条目\n", capacity);
printf(" 日志文件: %s\n", filename);
return 0;
}
/**
* 格式化日志条目
*/
char* format_log_entry(const log_entry_t *entry) {
char *buffer = malloc(512);
if (!buffer) {
return NULL;
}
struct tm *tm_info = localtime(&entry->timestamp);
char time_str[32];
strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", tm_info);
snprintf(buffer, 512, "[%s] [%s] [%s] [PID:%d TID:%lu] %s\n",
time_str, entry->level, entry->module,
entry->pid, (unsigned long)entry->tid, entry->message);
return buffer;
}
/**
* 添加日志条目到缓冲区
*/
int add_log_entry(log_buffer_t *buffer, const log_entry_t *entry) {
if (buffer->count >= buffer->capacity) {
printf("日志缓冲区已满\n");
return -1;
}
char *formatted_entry = format_log_entry(entry);
if (!formatted_entry) {
return -1;
}
buffer->iov[buffer->count].iov_base = formatted_entry;
buffer->iov[buffer->count].iov_len = strlen(formatted_entry);
buffer->total_size += buffer->iov[buffer->count].iov_len;
buffer->count++;
return 0;
}
/**
* 冲刷日志缓冲区
*/
int flush_log_buffer(log_buffer_t *buffer) {
if (buffer->count == 0) {
return 0;
}
printf("冲刷日志缓冲区: %d 条目, %zu 字节\n", buffer->count, buffer->total_size);
ssize_t bytes_written = writev(buffer->fd, buffer->iov, buffer->count);
if (bytes_written == -1) {
perror("写入日志失败");
return -1;
}
printf(" 成功写入 %zd 字节\n", bytes_written);
// 清理内存
for (int i = 0; i < buffer->count; i++) {
free(buffer->iov[i].iov_base);
}
buffer->count = 0;
buffer->total_size = 0;
return 0;
}
/**
* 演示日志系统优化
*/
int demo_log_system_optimization() {
log_buffer_t log_buffer;
const int buffer_capacity = 100;
const char *log_filename = "optimized_log.txt";
printf("=== 日志系统优化演示 ===\n");
// 初始化日志缓冲区
printf("1. 初始化日志缓冲区:\n");
if (init_log_buffer(&log_buffer, buffer_capacity, log_filename) != 0) {
printf("初始化日志缓冲区失败\n");
return -1;
}
// 生成测试日志条目
printf("\n2. 生成测试日志条目:\n");
const char *modules[] = {"Database", "Network", "Cache", "Security", "Monitor"};
const char *levels[] = {"DEBUG", "INFO", "WARN", "ERROR"};
const char *messages[] = {
"Operation completed successfully",
"Data synchronized with remote server",
"Cache hit ratio improved to 95%",
"Security scan completed without issues",
"Performance metrics updated"
};
srand(time(NULL));
// 模拟日志生成
printf(" 生成日志条目:\n");
for (int i = 0; i < 15; i++) {
log_entry_t entry;
entry.timestamp = time(NULL);
entry.level = levels[rand() % 4];
entry.module = modules[rand() % 5];
entry.message = messages[rand() % 5];
entry.pid = getpid();
entry.tid = pthread_self();
if (add_log_entry(&log_buffer, &entry) == 0) {
printf(" [%s] [%s] %s\n", entry.level, entry.module, entry.message);
} else {
printf(" 添加日志条目失败\n");
break;
}
// 模拟批量冲刷
if ((i + 1) % 5 == 0) {
printf(" 批量冲刷日志 (%d 条目)\n", log_buffer.count);
flush_log_buffer(&log_buffer);
}
}
// 最终冲刷
printf("\n3. 最终冲刷剩余日志:\n");
flush_log_buffer(&log_buffer);
// 验证日志文件
printf("\n4. 验证日志文件:\n");
int fd = open(log_filename, O_RDONLY);
if (fd != -1) {
char buffer[2048];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 日志文件内容 (%zd 字节):\n", bytes_read);
// 只显示前几行
char *line = strtok(buffer, "\n");
int line_count = 0;
while (line && line_count < 10) {
printf(" %s\n", line);
line = strtok(NULL, "\n");
line_count++;
}
if (line) {
printf(" ... (还有更多日志)\n");
}
}
close(fd);
unlink(log_filename);
}
// 清理资源
free(log_buffer.iov);
close(log_buffer.fd);
// 显示优化效果
printf("\n=== 日志系统优化效果 ===\n");
printf("1. 性能提升:\n");
printf(" ✓ 减少系统调用次数\n");
printf(" ✓ 提高批量写入效率\n");
printf(" ✓ 降低I/O开销\n");
printf("\n2. 资源优化:\n");
printf(" ✓ 减少内存分配次数\n");
printf(" ✓ 优化缓冲区使用\n");
printf(" ✓ 提高磁盘I/O效率\n");
printf("\n3. 可靠性提升:\n");
printf(" ✓ 原子性日志写入\n");
printf(" ✓ 错误处理机制\n");
printf(" ✓ 数据完整性保证\n");
return 0;
}
int main() {
return demo_log_system_optimization();
}
示例5:数据库事务日志
#include <sys/uio.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <sys/stat.h>
/**
* 事务操作类型
*/
typedef enum {
TXN_BEGIN = 1,
TXN_COMMIT = 2,
TXN_ROLLBACK = 3,
TXN_INSERT = 4,
TXN_UPDATE = 5,
TXN_DELETE = 6
} txn_operation_t;
/**
* 事务日志条目
*/
typedef struct {
txn_operation_t operation;
time_t timestamp;
unsigned long transaction_id;
const char *table_name;
const char *data;
size_t data_size;
} txn_log_entry_t;
/**
* 事务日志管理器
*/
typedef struct {
int log_fd;
char log_filename[256];
struct iovec *iov_buffer;
int buffer_capacity;
int buffer_count;
size_t buffer_size;
unsigned long current_txn_id;
} txn_log_manager_t;
/**
* 初始化事务日志管理器
*/
int init_txn_log_manager(txn_log_manager_t *manager, const char *log_dir) {
// 创建日志目录
struct stat st = {0};
if (stat(log_dir, &st) == -1) {
if (mkdir(log_dir, 0755) == -1) {
perror("创建日志目录失败");
return -1;
}
}
// 初始化管理器
snprintf(manager->log_filename, sizeof(manager->log_filename),
"%s/transaction.log", log_dir);
manager->log_fd = open(manager->log_filename,
O_CREAT | O_WRONLY | O_APPEND, 0644);
if (manager->log_fd == -1) {
perror("创建事务日志文件失败");
return -1;
}
manager->buffer_capacity = 64;
manager->iov_buffer = malloc(manager->buffer_capacity * sizeof(struct iovec));
if (!manager->iov_buffer) {
close(manager->log_fd);
return -1;
}
manager->buffer_count = 0;
manager->buffer_size = 0;
manager->current_txn_id = time(NULL); // 简单的事务ID生成
printf("事务日志管理器初始化完成:\n");
printf(" 日志文件: %s\n", manager->log_filename);
printf(" 缓冲区容量: %d\n", manager->buffer_capacity);
printf(" 初始事务ID: %lu\n", manager->current_txn_id);
return 0;
}
/**
* 格式化事务日志条目
*/
char* format_txn_log_entry(const txn_log_entry_t *entry) {
char *buffer = malloc(512);
if (!buffer) {
return NULL;
}
struct tm *tm_info = localtime(&entry->timestamp);
char time_str[32];
strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", tm_info);
const char *op_names[] = {
"", "BEGIN", "COMMIT", "ROLLBACK", "INSERT", "UPDATE", "DELETE"
};
snprintf(buffer, 512, "[%s] TXN:%lu OP:%s TABLE:%s SIZE:%zu DATA:%s\n",
time_str, entry->transaction_id,
op_names[entry->operation],
entry->table_name ? entry->table_name : "N/A",
entry->data_size,
entry->data ? entry->data : "");
return buffer;
}
/**
* 添加事务日志条目
*/
int add_txn_log_entry(txn_log_manager_t *manager, const txn_log_entry_t *entry) {
if (manager->buffer_count >= manager->buffer_capacity) {
printf("事务日志缓冲区已满,需要冲刷\n");
if (flush_txn_log_buffer(manager) != 0) {
return -1;
}
}
char *formatted_entry = format_txn_log_entry(entry);
if (!formatted_entry) {
return -1;
}
manager->iov_buffer[manager->buffer_count].iov_base = formatted_entry;
manager->iov_buffer[manager->buffer_count].iov_len = strlen(formatted_entry);
manager->buffer_size += manager->iov_buffer[manager->buffer_count].iov_len;
manager->buffer_count++;
printf("添加事务日志条目: TXN:%lu OP:%d\n",
entry->transaction_id, entry->operation);
return 0;
}
/**
* 冲刷事务日志缓冲区
*/
int flush_txn_log_buffer(txn_log_manager_t *manager) {
if (manager->buffer_count == 0) {
return 0;
}
printf("冲刷事务日志缓冲区: %d 条目, %zu 字节\n",
manager->buffer_count, manager->buffer_size);
ssize_t bytes_written = writev(manager->log_fd, manager->iov_buffer, manager->buffer_count);
if (bytes_written == -1) {
perror("写入事务日志失败");
return -1;
}
printf(" 成功写入 %zd 字节事务日志\n", bytes_written);
// 清理内存
for (int i = 0; i < manager->buffer_count; i++) {
free(manager->iov_buffer[i].iov_base);
}
manager->buffer_count = 0;
manager->buffer_size = 0;
// 同步到磁盘
fsync(manager->log_fd);
return 0;
}
/**
* 演示数据库事务日志
*/
int demo_database_transaction_log() {
txn_log_manager_t log_manager;
const char *log_directory = "./txn_logs";
printf("=== 数据库事务日志演示 ===\n");
// 初始化事务日志管理器
printf("1. 初始化事务日志管理器:\n");
if (init_txn_log_manager(&log_manager, log_directory) != 0) {
printf("初始化事务日志管理器失败\n");
return -1;
}
// 模拟数据库事务操作
printf("\n2. 模拟数据库事务操作:\n");
// 事务1: 插入操作
printf(" 事务1: 数据插入操作\n");
txn_log_entry_t insert_entry = {
.operation = TXN_BEGIN,
.timestamp = time(NULL),
.transaction_id = log_manager.current_txn_id++,
.table_name = "users",
.data = "{'name':'John','email':'john@example.com'}",
.data_size = 45
};
add_txn_log_entry(&log_manager, &insert_entry);
txn_log_entry_t insert_data = {
.operation = TXN_INSERT,
.timestamp = time(NULL),
.transaction_id = insert_entry.transaction_id,
.table_name = "users",
.data = "INSERT INTO users VALUES ('John', 'john@example.com')",
.data_size = 52
};
add_txn_log_entry(&log_manager, &insert_data);
txn_log_entry_t commit_entry = {
.operation = TXN_COMMIT,
.timestamp = time(NULL),
.transaction_id = insert_entry.transaction_id,
.table_name = "users",
.data = "Transaction committed successfully",
.data_size = 32
};
add_txn_log_entry(&log_manager, &commit_entry);
// 冲刷第一个事务
flush_txn_log_buffer(&log_manager);
// 事务2: 更新操作
printf("\n 事务2: 数据更新操作\n");
txn_log_entry_t update_entry = {
.operation = TXN_BEGIN,
.timestamp = time(NULL),
.transaction_id = log_manager.current_txn_id++,
.table_name = "orders",
.data = "{'order_id':12345,'status':'processing'}",
.data_size = 42
};
add_txn_log_entry(&log_manager, &update_entry);
txn_log_entry_t update_data = {
.operation = TXN_UPDATE,
.timestamp = time(NULL),
.transaction_id = update_entry.transaction_id,
.table_name = "orders",
.data = "UPDATE orders SET status='shipped' WHERE order_id=12345",
.data_size = 55
};
add_txn_log_entry(&log_manager, &update_data);
// 事务3: 删除操作(在同一事务中)
txn_log_entry_t delete_data = {
.operation = TXN_DELETE,
.timestamp = time(NULL),
.transaction_id = update_entry.transaction_id,
.table_name = "cart_items",
.data = "DELETE FROM cart_items WHERE user_id=1001",
.data_size = 43
};
add_txn_log_entry(&log_manager, &delete_data);
txn_log_entry_t commit_update = {
.operation = TXN_COMMIT,
.timestamp = time(NULL),
.transaction_id = update_entry.transaction_id,
.table_name = "orders",
.data = "Multi-table transaction committed",
.data_size = 35
};
add_txn_log_entry(&log_manager, &commit_update);
// 事务4: 回滚操作
printf("\n 事务3: 事务回滚操作\n");
txn_log_entry_t rollback_entry = {
.operation = TXN_BEGIN,
.timestamp = time(NULL),
.transaction_id = log_manager.current_txn_id++,
.table_name = "inventory",
.data = "Stock adjustment transaction",
.data_size = 28
};
add_txn_log_entry(&log_manager, &rollback_entry);
txn_log_entry_t error_entry = {
.operation = TXN_UPDATE,
.timestamp = time(NULL),
.transaction_id = rollback_entry.transaction_id,
.table_name = "inventory",
.data = "UPDATE inventory SET quantity=-5 WHERE product_id=100", // 错误:负库存
.data_size = 57
};
add_txn_log_entry(&log_manager, &error_entry);
txn_log_entry_t rollback_txn = {
.operation = TXN_ROLLBACK,
.timestamp = time(NULL),
.transaction_id = rollback_entry.transaction_id,
.table_name = "inventory",
.data = "Rollback due to negative stock adjustment",
.data_size = 45
};
add_txn_log_entry(&log_manager, &rollback_txn);
// 最终冲刷
printf("\n3. 最终冲刷剩余日志:\n");
flush_txn_log_buffer(&log_manager);
// 验证日志文件
printf("\n4. 验证事务日志文件:\n");
int fd = open(log_manager.log_filename, O_RDONLY);
if (fd != -1) {
char buffer[2048];
ssize_t bytes_read = read(fd, buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0';
printf(" 事务日志内容 (%zd 字节):\n", bytes_read);
// 显示日志内容
char *line = strtok(buffer, "\n");
while (line) {
printf(" %s\n", line);
line = strtok(NULL, "\n");
}
}
close(fd);
}
// 清理资源
if (log_manager.iov_buffer) {
// 清理可能残留的缓冲区条目
for (int i = 0; i < log_manager.buffer_count; i++) {
free(log_manager.iov_buffer[i].iov_base);
}
free(log_manager.iov_buffer);
}
if (log_manager.log_fd != -1) {
close(log_manager.log_fd);
}
// 清理日志文件
unlink(log_manager.log_filename);
rmdir(log_directory);
// 显示事务日志优势
printf("\n=== 事务日志优势 ===\n");
printf("1. ACID特性保证:\n");
printf(" ✓ 原子性: 事务操作要么全部成功要么全部失败\n");
printf(" ✓ 一致性: 系统从一个一致状态转换到另一个一致状态\n");
printf(" ✓ 隔离性: 并发事务之间相互隔离\n");
printf(" ✓ 持久性: 事务提交后对数据的修改是永久性的\n");
printf("\n2. 性能优化:\n");
printf(" ✓ 批量日志写入减少I/O操作\n");
printf(" ✓ 零拷贝数据组装提高效率\n");
printf(" ✓ 缓冲机制减少系统调用开销\n");
printf(" ✓ 异步写入提高响应速度\n");
printf("\n3. 可靠性保障:\n");
printf(" ✓ 完整的事务轨迹记录\n");
printf(" ✓ 快速故障恢复能力\n");
printf(" ✓ 数据一致性验证\n");
printf(" ✓ 审计和合规支持\n");
return 0;
}
int main() {
return demo_database_transaction_log();
}
writev 使用注意事项
系统限制:
- IOV_MAX: 系统限制iovec数组的最大长度(通常为1024)
- 单次写入限制: 一次writev调用写入的总字节数有限制
- 文件描述符类型: 不是所有文件描述符都支持writev
错误处理:
- 部分写入: 可能只写入部分数据
- 错误恢复: 需要处理部分成功的场景
- 资源清理: 失败时需要清理已分配的资源
性能考虑:
- 缓冲区大小: 合理设置iovec数组大小
- 内存对齐: 考虑内存对齐优化
- 批量操作: 尽量批量处理减少系统调用
安全考虑:
- 缓冲区验证: 验证iovec缓冲区的有效性
- 权限检查: 确保有适当的文件写入权限
- 数据完整性: 确保写入数据的完整性
最佳实践:
- 合理使用: 在合适的场景下使用writev
- 错误处理: 妥善处理各种错误情况
- 资源管理: 及时释放分配的资源
- 性能监控: 监控writev的性能表现
writev vs 相似函数对比
writev vs write:
// write: 单缓冲区写入
char buffer[1024];
write(fd, buffer, sizeof(buffer));
// writev: 多缓冲区写入
struct iovec iov[3];
iov[0].iov_base = buffer1; iov[0].iov_len = len1;
iov[1].iov_base = buffer2; iov[1].iov_len = len2;
iov[2].iov_base = buffer3; iov[2].iov_len = len3;
writev(fd, iov, 3);
writev vs sendmsg:
// writev: 简单的分散写入
writev(sockfd, iov, iovcnt);
// sendmsg: 带控制信息的分散写入
struct msghdr msg = {0};
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_control = control_data;
sendmsg(sockfd, &msg, flags);
常见使用场景
1. 网络协议组装:
// HTTP响应组装
struct iovec iov[5];
iov[0].iov_base = status_line; iov[0].iov_len = strlen(status_line);
iov[1].iov_base = headers; iov[1].iov_len = strlen(headers);
iov[2].iov_base = blank_line; iov[2].iov_len = 2;
iov[3].iov_base = content; iov[3].iov_len = content_len;
iov[4].iov_base = trailers; iov[4].iov_len = strlen(trailers);
writev(client_fd, iov, 5);
2. 日志系统优化:
// 批量日志写入
struct iovec log_entries[100];
// ... 填充日志条目
writev(log_fd, log_entries, entry_count);
3. 数据库事务日志:
// 事务操作日志
struct iovec txn_records[10];
// ... 填充事务记录
writev(txn_log_fd, txn_records, record_count);
总结
writev
是Linux系统中重要的分散写入函数,提供了:
- 高效性: 减少系统调用次数,提高I/O性能
- 灵活性: 支持多个不连续缓冲区的原子写入
- 标准兼容: 符合POSIX标准,广泛支持
- 应用场景广: 适用于网络编程、日志系统、数据库等场景
通过合理使用 writev
,可以构建高性能的数据处理和传输系统。在实际应用中,需要注意系统限制、错误处理和性能优化等问题。