vmsplice系统调用及示例

vmsplice函数详解

  1. 函数介绍

vmsplice函数是Linux系统中用于高效地将用户空间缓冲区数据传输到管道的函数。它是splice系列函数的一部分,专门用于从用户空间向管道传输数据。可以把vmsplice想象成一个”高速数据管道注入器”,它能够将内存中的数据块直接传输到管道中,而无需传统的数据拷贝操作。

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

vmsplice的主要优势在于它提供了零拷贝或最小拷贝的数据传输机制,通过将用户空间的内存页直接映射到内核空间,大大提高了数据传输的效率。这在需要大量数据传输的高性能应用中特别有用。

使用场景:

  • 高性能网络服务器的数据传输

  • 大数据处理和流处理应用

  • 实时系统的数据管道

  • 避免内存拷贝的高效数据传输

  • 管道和套接字之间的数据传输

  1. 函数原型
1
2
3
4
5
6
#define _GNU_SOURCE
#include <fcntl.h>
#include <sys/uio.h>

ssize_t vmsplice(int fd, const struct iovec *iov, unsigned long nr_segs, unsigned int flags);

  1. 功能

vmsplice函数的主要功能是将用户空间缓冲区的数据高效地传输到管道中。它支持多种传输模式和选项,可以优化不同场景下的数据传输性能。

  1. 参数

fd: 管道文件描述符

  • 类型:int

  • 含义:目标管道的写端文件描述符

iov: 输入输出向量数组

  • 类型:const struct iovec*

  • 含义:描述要传输的数据缓冲区的向量数组

  • 结构体定义:struct iovec { void *iov_base; // 缓冲区起始地址 size_t iov_len; // 缓冲区长度 };

nr_segs: 向量数组元素个数

  • 类型:unsigned long

  • 含义:iov数组中有效元素的个数

flags: 操作标志

  • 类型:unsigned int

  • 含义:控制传输行为的标志位

常用值:

  • SPLICE_F_MOVE:尽可能移动页面而不是复制

  • SPLICE_F_NONBLOCK:非阻塞操作

  • SPLICE_F_MORE:提示还有更多数据要写入

  • SPLICE_F_GIFT:页面是礼品(传输后内核拥有页面)

  1. 返回值
  • 成功: 返回实际传输的字节数

失败: 返回-1,并设置errno错误码

  • EAGAIN:非阻塞模式下无法立即完成

  • EBADF:无效的文件描述符

  • EINVAL:参数无效

  • ENOMEM:内存不足

  • EPIPE:管道已关闭读端

  1. 相似函数或关联函数
  • splice(): 在文件描述符之间传输数据

  • tee(): 在管道之间复制数据

  • write(): 传统的数据写入函数

  • writev(): 向量写入函数

  • mmap(): 内存映射函数

  • sendfile(): 文件到套接字的高效传输

  1. 示例代码

示例1:基础vmsplice使用 - 简单数据传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <string.h>
#include <errno.h>

// 创建管道
int create_pipe(int pipefd&#91;2]) {
if (pipe(pipefd) == -1) {
perror("创建管道失败");
return -1;
}
printf("创建管道: 读端=%d, 写端=%d\n", pipefd&#91;0], pipefd&#91;1]);
return 0;
}

// 使用vmsplice传输数据
ssize_t send_data_with_vmsplice(int pipe_write_fd, const char* data, size_t data_len) {
struct iovec iov;
iov.iov_base = (void*)data;
iov.iov_len = data_len;

ssize_t result = vmsplice(pipe_write_fd, &iov, 1, SPLICE_F_MOVE);
if (result == -1) {
perror("vmsplice传输失败");
} else {
printf("vmsplice传输成功: %zd 字节\n", result);
}

return result;
}

// 从管道接收数据
ssize_t receive_data_from_pipe(int pipe_read_fd, char* buffer, size_t buffer_size) {
ssize_t bytes_read = read(pipe_read_fd, buffer, buffer_size - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf("从管道读取: %zd 字节\n", bytes_read);
} else if (bytes_read == -1) {
perror("从管道读取失败");
}

return bytes_read;
}

int main() {
printf("=== 基础vmsplice使用示例 ===\n");

int pipefd&#91;2];

// 创建管道
if (create_pipe(pipefd) == -1) {
exit(EXIT_FAILURE);
}

// 准备测试数据
const char* test_data = "这是使用vmsplice传输的测试数据";
size_t data_len = strlen(test_data);

printf("1. 准备传输数据:\n");
printf(" 数据内容: %s\n", test_data);
printf(" 数据长度: %zu 字节\n", data_len);

// 使用vmsplice传输数据
printf("\n2. 使用vmsplice传输数据:\n");
ssize_t bytes_sent = send_data_with_vmsplice(pipefd&#91;1], test_data, data_len);

if (bytes_sent > 0) {
// 从管道接收数据
printf("\n3. 从管道接收数据:\n");
char receive_buffer&#91;256];
ssize_t bytes_received = receive_data_from_pipe(pipefd&#91;0], receive_buffer, sizeof(receive_buffer));

if (bytes_received > 0) {
printf(" 接收内容: %s\n", receive_buffer);
printf(" 验证结果: %s\n",
strcmp(test_data, receive_buffer) == 0 ? "✓ 数据一致" : "✗ 数据不一致");
}
}

// 演示向量传输
printf("\n4. 演示向量传输:\n");

// 准备多个数据段
const char* segments&#91;] = {
"第一段数据",
"第二段数据",
"第三段数据"
};

size_t segment_count = sizeof(segments) / sizeof(segments&#91;0]);
struct iovec iov&#91;segment_count];

// 初始化向量数组
for (size_t i = 0; i < segment_count; i++) {
iov&#91;i].iov_base = (void*)segments&#91;i];
iov&#91;i].iov_len = strlen(segments&#91;i]);
printf(" 段 %zu: %s (%zu 字节)\n", i + 1, segments&#91;i], strlen(segments&#91;i]));
}

// 使用vmsplice传输多个段
ssize_t vector_result = vmsplice(pipefd&#91;1], iov, segment_count, SPLICE_F_MOVE);
if (vector_result == -1) {
perror("向量传输失败");
} else {
printf(" 向量传输成功: %zd 字节\n", vector_result);

// 接收并验证数据
char vector_buffer&#91;256];
ssize_t total_received = 0;
printf(" 接收数据:\n");

while (total_received < vector_result) {
ssize_t bytes_read = read(pipefd&#91;0], vector_buffer + total_received,
sizeof(vector_buffer) - total_received - 1);
if (bytes_read > 0) {
total_received += bytes_read;
} else if (bytes_read == 0) {
break; // 管道已关闭
} else {
perror("读取数据失败");
break;
}
}

if (total_received > 0) {
vector_buffer&#91;total_received] = '\0';
printf(" 接收内容: %s\n", vector_buffer);
}
}

// 演示错误处理
printf("\n5. 错误处理演示:\n");

// 使用无效的文件描述符
struct iovec invalid_iov = {(void*)"test", 4};
ssize_t error_result = vmsplice(999, &invalid_iov, 1, 0);
if (error_result == -1) {
printf(" 使用无效fd: %s (预期错误)\n", strerror(errno));
}

// 使用空向量
error_result = vmsplice(pipefd&#91;1], NULL, 0, 0);
if (error_result == -1) {
printf(" 使用空向量: %s (预期错误)\n", strerror(errno));
}

// 清理资源
printf("\n6. 清理资源:\n");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
printf(" 管道已关闭\n");

printf("\n=== 基础vmsplice演示完成 ===\n");

return 0;
}

示例2:高性能数据管道应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>
#include <time.h>

#define BUFFER_SIZE 4096
#define NUM_BUFFERS 100

// 生产者进程 - 使用vmsplice发送数据
void producer_process(int pipe_write_fd) {
printf("生产者进程启动 (PID: %d)\n", getpid());

// 准备数据缓冲区
char** buffers = malloc(NUM_BUFFERS * sizeof(char*));
if (!buffers) {
perror("分配缓冲区失败");
exit(EXIT_FAILURE);
}

// 初始化缓冲区
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers&#91;i] = malloc(BUFFER_SIZE);
if (!buffers&#91;i]) {
perror("分配缓冲区失败");
exit(EXIT_FAILURE);
}

// 填充缓冲区内容
snprintf(buffers&#91;i], BUFFER_SIZE,
"生产者数据包 %d - 时间戳: %ld", i + 1, time(NULL));
}

struct iovec iov;
int total_sent = 0;

// 发送数据包
for (int i = 0; i < NUM_BUFFERS; i++) {
iov.iov_base = buffers&#91;i];
iov.iov_len = strlen(buffers&#91;i]) + 1; // 包括字符串结束符

// 使用vmsplice发送数据
ssize_t bytes_sent = vmsplice(pipe_write_fd, &iov, 1,
SPLICE_F_MOVE | SPLICE_F_MORE);

if (bytes_sent == -1) {
if (errno == EAGAIN) {
printf("生产者: 管道满,等待...\n");
usleep(1000); // 等待1毫秒后重试
i--; // 重试当前数据包
continue;
} else {
perror("生产者: vmsplice发送失败");
break;
}
} else {
total_sent += bytes_sent;
if ((i + 1) % 20 == 0) {
printf("生产者: 已发送 %d 个数据包 (%d 字节)\n", i + 1, total_sent);
}
}

// 模拟处理时间
if (i % 10 == 0) {
usleep(10000); // 10毫秒
}
}

printf("生产者: 总共发送 %d 字节\n", total_sent);

// 清理缓冲区
for (int i = 0; i < NUM_BUFFERS; i++) {
free(buffers&#91;i]);
}
free(buffers);

printf("生产者进程完成\n");
}

// 消费者进程 - 从管道接收数据
void consumer_process(int pipe_read_fd) {
printf("消费者进程启动 (PID: %d)\n", getpid());

char buffer&#91;BUFFER_SIZE];
int total_received = 0;
int packet_count = 0;

clock_t start_time = clock();

// 接收数据
while (packet_count < NUM_BUFFERS) {
ssize_t bytes_received = read(pipe_read_fd, buffer, sizeof(buffer) - 1);

if (bytes_received > 0) {
buffer&#91;bytes_received] = '\0';
total_received += bytes_received;
packet_count++;

if (packet_count % 20 == 0) {
printf("消费者: 已接收 %d 个数据包 (%d 字节)\n",
packet_count, total_received);
}
} else if (bytes_received == 0) {
printf("消费者: 管道已关闭\n");
break;
} else {
if (errno == EAGAIN) {
usleep(1000); // 等待1毫秒后重试
continue;
} else {
perror("消费者: 读取数据失败");
break;
}
}
}

clock_t end_time = clock();
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;

printf("消费者: 总共接收 %d 个数据包, %d 字节\n", packet_count, total_received);
printf("消费者: 耗时 %.3f 秒, 吞吐量 %.2f KB/s\n",
elapsed_time, (total_received / 1024.0) / elapsed_time);

printf("消费者进程完成\n");
}

// 性能测试函数
void performance_test() {
printf("=== 性能测试 ===\n");

int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
return;
}

// 设置管道为非阻塞模式
int flags = fcntl(pipefd&#91;1], F_GETFL);
fcntl(pipefd&#91;1], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(pipefd&#91;0], F_GETFL);
fcntl(pipefd&#91;0], F_SETFL, flags | O_NONBLOCK);

// 准备大量测试数据
char* large_data = malloc(1024 * 1024); // 1MB数据
if (!large_data) {
perror("分配测试数据失败");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
return;
}

memset(large_data, 'A', 1024 * 1024);
large_data&#91;1024 * 1024 - 1] = '\0';

// 测试vmsplice性能
clock_t start_time = clock();

struct iovec iov;
iov.iov_base = large_data;
iov.iov_len = 1024 * 1024;

ssize_t bytes_sent = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_MOVE);

clock_t end_time = clock();
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;

if (bytes_sent > 0) {
printf("vmsplice传输 %zd 字节\n", bytes_sent);
printf("耗时: %.6f 秒\n", elapsed_time);
printf("吞吐量: %.2f MB/s\n", (bytes_sent / (1024.0 * 1024.0)) / elapsed_time);
} else {
printf("vmsplice传输失败: %s\n", strerror(errno));
}

// 测试传统write性能(对比)
lseek(pipefd&#91;0], 0, SEEK_SET); // 清空管道
lseek(pipefd&#91;1], 0, SEEK_SET);

start_time = clock();
ssize_t write_result = write(pipefd&#91;1], large_data, 1024 * 1024);
end_time = clock();
elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;

if (write_result > 0) {
printf("write传输 %zd 字节\n", write_result);
printf("耗时: %.6f 秒\n", elapsed_time);
printf("吞吐量: %.2f MB/s\n", (write_result / (1024.0 * 1024.0)) / elapsed_time);
}

free(large_data);
close(pipefd&#91;0]);
close(pipefd&#91;1]);

printf("=== 性能测试完成 ===\n\n");
}

int main() {
printf("=== 高性能数据管道应用示例 ===\n");

// 执行性能测试
performance_test();

// 创建管道用于进程间通信
int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
exit(EXIT_FAILURE);
}

printf("创建数据管道: 读端=%d, 写端=%d\n", pipefd&#91;0], pipefd&#91;1]);

// 启动生产者进程
pid_t producer_pid = fork();
if (producer_pid == 0) {
// 生产者子进程
close(pipefd&#91;0]); // 关闭读端
producer_process(pipefd&#91;1]);
close(pipefd&#91;1]);
exit(EXIT_SUCCESS);
} else if (producer_pid == -1) {
perror("创建生产者进程失败");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
exit(EXIT_FAILURE);
}

// 启动消费者进程
pid_t consumer_pid = fork();
if (consumer_pid == 0) {
// 消费者子进程
close(pipefd&#91;1]); // 关闭写端
consumer_process(pipefd&#91;0]);
close(pipefd&#91;0]);
exit(EXIT_SUCCESS);
} else if (consumer_pid == -1) {
perror("创建消费者进程失败");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
exit(EXIT_FAILURE);
}

// 父进程关闭两端并等待子进程完成
close(pipefd&#91;0]);
close(pipefd&#91;1]);

printf("主进程等待子进程完成...\n");
waitpid(producer_pid, NULL, 0);
waitpid(consumer_pid, NULL, 0);

printf("所有进程已完成\n");

printf("\n=== 高性能数据管道应用演示完成 ===\n");

return 0;
}

示例3:vmsplice与内存映射结合使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/mman.h>
#include <string.h>
#include <errno.h>
#include <time.h>

#define SHARED_MEMORY_SIZE (1024 * 1024) // 1MB
#define PACKET_SIZE 1024

// 共享内存结构
typedef struct {
volatile int write_index;
volatile int read_index;
volatile int data_ready;
char data&#91;SHARED_MEMORY_SIZE - sizeof(int) * 3];
} shared_memory_t;

// 使用vmsplice发送内存映射数据
ssize_t send_mmap_data_with_vmsplice(int pipe_fd, const void* data, size_t data_size) {
struct iovec iov;
iov.iov_base = (void*)data;
iov.iov_len = data_size;

// 使用SPLICE_F_GIFT标志,表示传输后内核拥有页面
ssize_t result = vmsplice(pipe_fd, &iov, 1, SPLICE_F_MOVE | SPLICE_F_GIFT);

if (result == -1) {
printf("vmsplice发送失败: %s\n", strerror(errno));
} else {
printf("vmsplice发送成功: %zd 字节\n", result);
}

return result;
}

// 创建测试数据文件
int create_test_data_file(const char* filename, size_t size) {
int fd = open(filename, O_CREAT | O_WRONLY | O_TRUNC, 0644);
if (fd == -1) {
perror("创建测试文件失败");
return -1;
}

// 填充测试数据
char* buffer = malloc(4096);
if (!buffer) {
close(fd);
return -1;
}

for (size_t i = 0; i < size; i += 4096) {
size_t write_size = (size - i > 4096) ? 4096 : (size - i);
memset(buffer, 'A' + (i / 4096) % 26, write_size - 1);
buffer&#91;write_size - 1] = '\n';

if (write(fd, buffer, write_size) != (ssize_t)write_size) {
perror("写入测试数据失败");
free(buffer);
close(fd);
return -1;
}
}

free(buffer);
printf("创建测试文件: %s (%zu 字节)\n", filename, size);
return fd;
}

// 演示文件到管道的高效传输
void demonstrate_file_to_pipe_transfer() {
printf("=== 文件到管道传输演示 ===\n");

const char* test_file = "vmsplice_test_data.txt";
const size_t file_size = 64 * 1024; // 64KB

// 创建测试数据文件
int file_fd = create_test_data_file(test_file, file_size);
if (file_fd == -1) {
return;
}

// 创建管道
int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
close(file_fd);
unlink(test_file);
return;
}

// 内存映射文件
char* mapped_data = mmap(NULL, file_size, PROT_READ, MAP_PRIVATE, file_fd, 0);
if (mapped_data == MAP_FAILED) {
perror("内存映射文件失败");
close(file_fd);
close(pipefd&#91;0]);
close(pipefd&#91;1]);
unlink(test_file);
return;
}

printf("文件内存映射成功: %p (%zu 字节)\n", mapped_data, file_size);

// 使用vmsplice传输映射的数据
printf("使用vmsplice传输映射数据...\n");

clock_t start_time = clock();
ssize_t bytes_sent = send_mmap_data_with_vmsplice(pipefd&#91;1], mapped_data, file_size);
clock_t end_time = clock();

if (bytes_sent > 0) {
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("传输完成: %zd 字节\n", bytes_sent);
printf("耗时: %.6f 秒\n", elapsed_time);
printf("吞吐量: %.2f MB/s\n", (bytes_sent / (1024.0 * 1024.0)) / elapsed_time);

// 验证数据传输
printf("验证数据传输...\n");
char* verify_buffer = malloc(file_size);
if (verify_buffer) {
ssize_t bytes_received = read(pipefd&#91;0], verify_buffer, file_size);
if (bytes_received > 0) {
printf("验证接收: %zd 字节\n", bytes_received);
if (bytes_received == bytes_sent) {
printf("✓ 数据传输验证通过\n");
} else {
printf("✗ 数据传输验证失败\n");
}
}
free(verify_buffer);
}
}

// 清理资源
munmap(mapped_data, file_size);
close(file_fd);
close(pipefd&#91;0]);
close(pipefd&#91;1]);
unlink(test_file);

printf("=== 文件传输演示完成 ===\n\n");
}

// 高级vmsplice特性演示
void advanced_vmsplice_features() {
printf("=== 高级vmsplice特性演示 ===\n");

int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
return;
}

// 演示不同标志的使用
printf("1. 不同标志演示:\n");

const char* test_message = "测试消息数据";
struct iovec iov;
iov.iov_base = (void*)test_message;
iov.iov_len = strlen(test_message);

// SPLICE_F_MOVE 标志
printf(" 使用 SPLICE_F_MOVE 标志:\n");
ssize_t result = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_MOVE);
if (result > 0) {
printf(" ✓ 传输成功: %zd 字节\n", result);

// 读取验证
char buffer&#91;256];
ssize_t bytes_read = read(pipefd&#91;0], buffer, sizeof(buffer) - 1);
if (bytes_read > 0) {
buffer&#91;bytes_read] = '\0';
printf(" 接收数据: %s\n", buffer);
}
}

// SPLICE_F_MORE 标志(提示还有更多数据)
printf(" 使用 SPLICE_F_MORE 标志:\n");
const char* more_data = "更多数据";
iov.iov_base = (void*)more_data;
iov.iov_len = strlen(more_data);

result = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_MORE);
if (result > 0) {
printf(" ✓ 传输成功: %zd 字节\n", result);
}

// SPLICE_F_NONBLOCK 标志(非阻塞模式)
printf(" 使用 SPLICE_F_NONBLOCK 标志:\n");

// 设置管道为非阻塞模式
int flags = fcntl(pipefd&#91;1], F_GETFL);
fcntl(pipefd&#91;1], F_SETFL, flags | O_NONBLOCK);

const char* nonblock_data = "非阻塞数据";
iov.iov_base = (void*)nonblock_data;
iov.iov_len = strlen(nonblock_data);

result = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_NONBLOCK);
if (result > 0) {
printf(" ✓ 非阻塞传输成功: %zd 字节\n", result);
} else if (result == -1) {
if (errno == EAGAIN) {
printf(" ✓ 非阻塞模式下暂时无法传输 (EAGAIN)\n");
} else {
printf(" ✗ 传输失败: %s\n", strerror(errno));
}
}

// 演示大数据传输
printf("\n2. 大数据传输演示:\n");

// 重置管道为阻塞模式
fcntl(pipefd&#91;1], F_SETFL, flags);

// 分配大块内存
size_t large_data_size = 64 * 1024; // 64KB
char* large_data = malloc(large_data_size);
if (large_data) {
// 填充数据
for (size_t i = 0; i < large_data_size; i++) {
large_data&#91;i] = 'A' + (i % 26);
}

iov.iov_base = large_data;
iov.iov_len = large_data_size;

clock_t start_time = clock();
result = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_MOVE);
clock_t end_time = clock();

if (result > 0) {
double elapsed_time = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf(" 大数据传输: %zd 字节\n", result);
printf(" 耗时: %.6f 秒\n", elapsed_time);
printf(" 吞吐量: %.2f MB/s\n", (result / (1024.0 * 1024.0)) / elapsed_time);
}

free(large_data);
}

// 清理资源
close(pipefd&#91;0]);
close(pipefd&#91;1]);

printf("=== 高级特性演示完成 ===\n\n");
}

int main() {
printf("=== vmsplice与内存映射结合使用示例 ===\n");

// 演示文件到管道的高效传输
demonstrate_file_to_pipe_transfer();

// 演示高级vmsplice特性
advanced_vmsplice_features();

// 错误处理和边界情况演示
printf("=== 错误处理演示 ===\n");

// 使用无效参数
printf("1. 无效参数测试:\n");

ssize_t result = vmsplice(-1, NULL, 0, 0);
if (result == -1) {
printf(" 无效文件描述符: %s (预期)\n", strerror(errno));
}

// 使用空向量
int dummy_pipe&#91;2];
if (pipe(dummy_pipe) == 0) {
result = vmsplice(dummy_pipe&#91;1], NULL, 0, 0);
if (result == -1) {
printf(" 空向量: %s (预期)\n", strerror(errno));
}
close(dummy_pipe&#91;0]);
close(dummy_pipe&#91;1]);
}

// 使用无效标志
if (pipe(dummy_pipe) == 0) {
struct iovec iov = {(void*)"test", 4};
result = vmsplice(dummy_pipe&#91;1], &iov, 1, 0xFFFFFFFF);
if (result == -1) {
printf(" 无效标志: %s (预期)\n", strerror(errno));
}
close(dummy_pipe&#91;0]);
close(dummy_pipe&#91;1]);
}

printf("=== 错误处理演示完成 ===\n");

printf("\n=== vmsplice综合演示完成 ===\n");

return 0;
}

示例4:实际应用场景 - 网络数据转发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <pthread.h>

#define BUFFER_SIZE 8192
#define MAX_PACKETS 1000

// 数据包结构
typedef struct {
char data&#91;BUFFER_SIZE];
size_t length;
int packet_id;
time_t timestamp;
} data_packet_t;

// 转发器统计信息
typedef struct {
volatile long long packets_forwarded;
volatile long long bytes_forwarded;
volatile long long packets_dropped;
time_t start_time;
} forwarder_stats_t;

forwarder_stats_t stats = {0};

// 模拟网络接收缓冲区
typedef struct {
char* buffer;
size_t size;
size_t offset;
} network_buffer_t;

// 创建模拟网络缓冲区
network_buffer_t* create_network_buffer(size_t size) {
network_buffer_t* nb = malloc(sizeof(network_buffer_t));
if (!nb) return NULL;

nb->buffer = malloc(size);
if (!nb->buffer) {
free(nb);
return NULL;
}

nb->size = size;
nb->offset = 0;

// 填充模拟数据
for (size_t i = 0; i < size; i++) {
nb->buffer&#91;i] = 'A' + (i % 26);
}

return nb;
}

// 从网络缓冲区读取数据包
int read_packet_from_buffer(network_buffer_t* nb, char* packet_buffer, size_t max_size) {
if (nb->offset >= nb->size) {
return 0; // 没有更多数据
}

// 模拟不同大小的数据包
size_t packet_size = 100 + (rand() % 1000);
if (packet_size > max_size) {
packet_size = max_size;
}

if (nb->offset + packet_size > nb->size) {
packet_size = nb->size - nb->offset;
}

if (packet_size > 0) {
memcpy(packet_buffer, nb->buffer + nb->offset, packet_size);
nb->offset += packet_size;
return packet_size;
}

return 0;
}

// 使用vmsplice转发数据包
int forward_packet_with_vmsplice(int pipe_fd, const char* packet_data, size_t packet_size) {
struct iovec iov;
iov.iov_base = (void*)packet_data;
iov.iov_len = packet_size;

ssize_t result = vmsplice(pipe_fd, &iov, 1, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);

if (result > 0) {
__atomic_fetch_add(&stats.packets_forwarded, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&stats.bytes_forwarded, result, __ATOMIC_RELAXED);
return 0; // 成功
} else if (result == -1) {
if (errno == EAGAIN) {
// 管道满,数据包被丢弃
__atomic_fetch_add(&stats.packets_dropped, 1, __ATOMIC_RELAXED);
return 1; // 丢弃
} else {
perror("vmsplice转发失败");
return -1; // 错误
}
}

return -1;
}

// 数据包生成器线程
void* packet_generator_thread(void* arg) {
int pipe_write_fd = *(int*)arg;
network_buffer_t* nb = create_network_buffer(1024 * 1024); // 1MB缓冲区

if (!nb) {
printf("生成器: 创建网络缓冲区失败\n");
return NULL;
}

printf("生成器线程启动\n");

char packet_buffer&#91;BUFFER_SIZE];
int packet_count = 0;

// 生成数据包
while (packet_count < MAX_PACKETS) {
int packet_size = read_packet_from_buffer(nb, packet_buffer, sizeof(packet_buffer));

if (packet_size > 0) {
// 使用vmsplice转发数据包
int result = forward_packet_with_vmsplice(pipe_write_fd, packet_buffer, packet_size);

if (result == 0) {
packet_count++;
if (packet_count % 100 == 0) {
printf("生成器: 已生成 %d 个数据包\n", packet_count);
}
} else if (result == 1) {
printf("生成器: 数据包被丢弃 (管道满)\n");
} else {
printf("生成器: 转发错误\n");
break;
}

// 模拟网络延迟
usleep(1000); // 1毫秒
} else {
break; // 没有更多数据
}
}

free(nb->buffer);
free(nb);

printf("生成器线程完成,共生成 %d 个数据包\n", packet_count);
return NULL;
}

// 数据包处理器线程
void* packet_processor_thread(void* arg) {
int pipe_read_fd = *(int*)arg;

printf("处理器线程启动\n");

char buffer&#91;BUFFER_SIZE];
int processed_packets = 0;

// 处理数据包
while (processed_packets < MAX_PACKETS) {
ssize_t bytes_received = read(pipe_read_fd, buffer, sizeof(buffer));

if (bytes_received > 0) {
// 模拟数据包处理
processed_packets++;

if (processed_packets % 100 == 0) {
printf("处理器: 已处理 %d 个数据包\n", processed_packets);
}

// 模拟处理时间
usleep(500); // 0.5毫秒
} else if (bytes_received == 0) {
printf("处理器: 管道已关闭\n");
break;
} else {
if (errno == EAGAIN) {
usleep(1000); // 等待1毫秒后重试
continue;
} else {
perror("处理器: 读取数据失败");
break;
}
}
}

printf("处理器线程完成,共处理 %d 个数据包\n", processed_packets);
return NULL;
}

// 显示转发统计
void show_forwarding_statistics() {
time_t current_time = time(NULL);
double elapsed_time = difftime(current_time, stats.start_time);

long long packets_forwarded = __atomic_load_n(&stats.packets_forwarded, __ATOMIC_RELAXED);
long long bytes_forwarded = __atomic_load_n(&stats.bytes_forwarded, __ATOMIC_RELAXED);
long long packets_dropped = __atomic_load_n(&stats.packets_dropped, __ATOMIC_RELAXED);

printf("\n=== 转发统计 ===\n");
printf("转发数据包: %lld\n", packets_forwarded);
printf("转发字节数: %lld (%.2f MB)\n", bytes_forwarded, bytes_forwarded / (1024.0 * 1024.0));
printf("丢弃数据包: %lld\n", packets_dropped);
printf("运行时间: %.2f 秒\n", elapsed_time);

if (elapsed_time > 0) {
printf("平均转发速率: %.2f 包/秒\n", packets_forwarded / elapsed_time);
printf("平均吞吐量: %.2f MB/s\n", (bytes_forwarded / (1024.0 * 1024.0)) / elapsed_time);
}

if (packets_forwarded + packets_dropped > 0) {
double drop_rate = (double)packets_dropped / (packets_forwarded + packets_dropped) * 100;
printf("丢包率: %.2f%%\n", drop_rate);
}
printf("================\n\n");
}

// 性能基准测试
void performance_benchmark() {
printf("=== vmsplice性能基准测试 ===\n");

int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
return;
}

// 设置非阻塞模式
int flags = fcntl(pipefd&#91;1], F_GETFL);
fcntl(pipefd&#91;1], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(pipefd&#91;0], F_GETFL);
fcntl(pipefd&#91;0], F_SETFL, flags | O_NONBLOCK);

// 准备测试数据
size_t test_sizes&#91;] = {1024, 4096, 16384, 65536, 262144}; // 1KB到256KB
int num_sizes = sizeof(test_sizes) / sizeof(test_sizes&#91;0]);

printf("%-10s %-15s %-15s %-15s\n", "大小", "vmsplice", "write", "性能提升");
printf("%-10s %-15s %-15s %-15s\n", "----", "--------", "-----", "--------");

for (int i = 0; i < num_sizes; i++) {
size_t size = test_sizes&#91;i];
char* test_data = malloc(size);
if (!test_data) continue;

// 填充测试数据
memset(test_data, 'X', size);

struct iovec iov;
iov.iov_base = test_data;
iov.iov_len = size;

// 测试vmsplice
clock_t start = clock();
ssize_t vmsplice_result = vmsplice(pipefd&#91;1], &iov, 1, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
clock_t end = clock();
double vmsplice_time = ((double)(end - start)) / CLOCKS_PER_SEC;

// 清空管道
char dummy_buffer&#91;1024 * 1024];
while (read(pipefd&#91;0], dummy_buffer, sizeof(dummy_buffer)) > 0);

// 测试传统write
start = clock();
ssize_t write_result = write(pipefd&#91;1], test_data, size);
end = clock();
double write_time = ((double)(end - start)) / CLOCKS_PER_SEC;

// 清空管道
while (read(pipefd&#91;0], dummy_buffer, sizeof(dummy_buffer)) > 0);

double speedup = (write_time > 0) ? (write_time / vmsplice_time) : 0;

printf("%-10zu %-15.6f %-15.6f %-15.2fx\n",
size, vmsplice_time, write_time, speedup);

free(test_data);
}

close(pipefd&#91;0]);
close(pipefd&#91;1]);

printf("=== 性能基准测试完成 ===\n\n");
}

int main() {
printf("=== 网络数据转发应用示例 ===\n");

// 执行性能基准测试
performance_benchmark();

// 初始化统计
stats.start_time = time(NULL);

// 创建管道用于线程间通信
int pipefd&#91;2];
if (pipe(pipefd) == -1) {
perror("创建管道失败");
exit(EXIT_FAILURE);
}

// 设置管道为非阻塞模式
int flags = fcntl(pipefd&#91;1], F_GETFL);
fcntl(pipefd&#91;1], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(pipefd&#91;0], F_GETFL);
fcntl(pipefd&#91;0], F_SETFL, flags | O_NONBLOCK);

printf("创建转发管道: 读端=%d, 写端=%d\n", pipefd&#91;0], pipefd&#91;1]);

// 创建线程
pthread_t generator_thread, processor_thread;

// 启动数据包生成器线程
if (pthread_create(&generator_thread, NULL, packet_generator_thread, &pipefd&#91;1]) != 0) {
perror("创建生成器线程失败");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
exit(EXIT_FAILURE);
}

// 启动数据包处理器线程
if (pthread_create(&processor_thread, NULL, packet_processor_thread, &pipefd&#91;0]) != 0) {
perror("创建处理器线程失败");
close(pipefd&#91;0]);
close(pipefd&#91;1]);
exit(EXIT_FAILURE);
}

// 主线程定期显示统计信息
for (int i = 0; i < 30; i++) { // 运行30秒
sleep(2);
show_forwarding_statistics();
}

// 等待线程完成
pthread_join(generator_thread, NULL);
pthread_join(processor_thread, NULL);

// 显示最终统计
show_forwarding_statistics();

// 清理资源
close(pipefd&#91;0]);
close(pipefd&#91;1]);

printf("=== 网络数据转发应用演示完成 ===\n");

return 0;
}

编译和运行

1
2
3
4
5
6
7
8
9
10
11
12
# 编译示例(需要定义_GNU_SOURCE)
gcc -D_GNU_SOURCE -o vmsplice_example1 vmsplice_example1.c
gcc -D_GNU_SOURCE -o vmsplice_example2 vmsplice_example2.c
gcc -D_GNU_SOURCE -o vmsplice_example3 vmsplice_example3.c
gcc -D_GNU_SOURCE -o vmsplice_example4 vmsplice_example4.c -lpthread

# 运行示例
./vmsplice_example1
./vmsplice_example2
./vmsplice_example3
./vmsplice_example4

重要注意事项

内核支持: vmsplice需要Linux 2.6.17或更高版本内核支持

权限要求: 需要适当的权限来创建和访问管道

内存管理: 使用SPLICE_F_GIFT标志时要注意内存所有权转移

非阻塞模式: 建议在生产环境中使用非阻塞模式避免阻塞

错误处理: 必须检查返回值并处理EAGAIN等错误

性能考虑: vmsplice在大数据传输时优势明显

线程安全: 管道操作是线程安全的

最佳实践

使用非阻塞模式: 避免无限期阻塞

合理设置标志: 根据应用场景选择合适的标志

内存管理: 正确处理内存所有权和生命周期

错误处理: 完善的错误处理和恢复机制

性能监控: 监控传输性能和系统资源使用

批量传输: 使用向量传输提高效率

资源清理: 及时关闭文件描述符和释放内存

通过这些示例,你可以理解vmsplice在高效数据传输方面的强大功能,它为Linux系统提供了零拷贝或最小拷贝的数据传输机制,特别适用于高性能网络应用、大数据处理和实时系统。

data-ad-format="auto" data-full-width-responsive="true">