sendmmsg系统调用及示例

sendmmsg 函数详解

  1. 函数介绍

sendmmsg 是Linux 2.6.39引入的高效批量发送消息系统调用。它是 sendmsg 的批量版本,允许应用程序在单次系统调用中发送多个消息,显著减少了系统调用开销,特别适用于高吞吐量的网络服务器和实时应用。

  1. 函数原型
1
2
3
4
5
#define _GNU_SOURCE
#include <sys/socket.h>
int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen,
int flags);

  1. 功能

sendmmsg 允许向套接字批量发送多个消息,每个消息可以包含数据和控制信息。它支持分散缓冲区发送、地址信息指定、控制数据发送等功能,是构建高性能网络应用的关键工具。

  1. 参数
  • int sockfd: 套接字文件描述符

  • *struct mmsghdr msgvec: 消息向量数组,描述多个发送消息

  • unsigned int vlen: 消息向量数组的长度(最大可发送的消息数)

  • int flags: 发送标志,与sendmsg相同

  1. 返回值
  • 成功: 返回实际发送的消息数量

  • 失败: 返回-1,并设置errno

  1. 相似函数,或关联函数
  • sendmsg: 单消息发送函数

  • send: 基本发送函数

  • recvmmsg: 对应的批量接收函数

  • writev: 分散缓冲区写入函数

  1. 示例代码

示例1:基础sendmmsg使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

/**
* 演示sendmmsg的基础使用方法
*/
int demo_sendmmsg_basic() {
int server_fd, client_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
struct mmsghdr msgvec&#91;3];
struct iovec iov&#91;3]&#91;1];
char *messages&#91;3] = {
"First message from sendmmsg",
"Second message from sendmmsg",
"Third message from sendmmsg"
};
int messages_sent;

printf("=== sendmmsg 基础使用示例 ===\n");

// 创建TCP服务器套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("创建服务器套接字失败");
return -1;
}

// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8080);

// 绑定套接字
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server_fd);
return -1;
}

// 监听连接
if (listen(server_fd, 1) == -1) {
perror("监听失败");
close(server_fd);
return -1;
}

printf("服务器监听在端口 8080\n");

// 启动客户端连接
if (fork() == 0) {
// 客户端代码
sleep(1); // 等待服务器启动
client_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;

memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8080);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

if (connect(client_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
printf("客户端连接成功\n");

// 接收服务器发送的消息
char buffer&#91;256];
for (int i = 0; i < 3; i++) {
ssize_t bytes = recv(client_fd, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
buffer&#91;bytes] = '\0';
printf("客户端接收到消息 %d: %s\n", i + 1, buffer);
}
}
}

close(client_fd);
exit(0);
}

// 接受客户端连接
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
perror("接受连接失败");
close(server_fd);
return -1;
}

printf("客户端连接来自: %s:%d\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

// 准备批量发送消息结构
memset(msgvec, 0, sizeof(msgvec));

for (int i = 0; i < 3; i++) {
// 设置每个消息的缓冲区
iov&#91;i]&#91;0].iov_base = messages&#91;i];
iov&#91;i]&#91;0].iov_len = strlen(messages&#91;i]);

// 设置消息头
msgvec&#91;i].msg_hdr.msg_iov = iov&#91;i];
msgvec&#91;i].msg_hdr.msg_iovlen = 1;
msgvec&#91;i].msg_hdr.msg_name = NULL; // TCP不需要目标地址
msgvec&#91;i].msg_hdr.msg_namelen = 0;
}

printf("准备批量发送3个消息...\n");

// 批量发送消息
messages_sent = sendmmsg(client_fd, msgvec, 3, 0);

if (messages_sent == -1) {
perror("sendmmsg 失败");
close(client_fd);
close(server_fd);
return -1;
}

printf("成功发送 %d 个消息:\n", messages_sent);

// 显示发送结果
for (int i = 0; i < messages_sent; i++) {
printf(" 消息 %d: 发送了 %u 字节\n", i + 1, msgvec&#91;i].msg_len);
}

close(client_fd);
close(server_fd);

// 等待客户端结束
int status;
wait(&status);

return 0;
}

int main() {
return demo_sendmmsg_basic();
}

示例2:UDP批量发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>

/**
* 演示UDP批量发送消息
*/
int demo_udp_batch_send() {
int client_fd;
struct sockaddr_in server_addr;
struct mmsghdr msgvec&#91;5];
struct iovec iov&#91;5]&#91;2]; // 每个消息使用2个缓冲区
char *message_parts&#91;5]&#91;2] = {
{"UDP ", "Message 1"},
{"UDP ", "Message 2"},
{"UDP ", "Message 3"},
{"UDP ", "Message 4"},
{"UDP ", "Message 5"}
};
int messages_sent;

printf("=== UDP批量发送示例 ===\n");

// 创建UDP客户端套接字
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (client_fd == -1) {
perror("创建UDP套接字失败");
return -1;
}

// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(8081);
server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

printf("UDP客户端准备向 127.0.0.1:8081 发送消息\n");

// 准备批量发送结构(使用分散缓冲区)
memset(msgvec, 0, sizeof(msgvec));

for (int i = 0; i < 5; i++) {
// 设置分散缓冲区
iov&#91;i]&#91;0].iov_base = message_parts&#91;i]&#91;0];
iov&#91;i]&#91;0].iov_len = strlen(message_parts&#91;i]&#91;0]);
iov&#91;i]&#91;1].iov_base = message_parts&#91;i]&#91;1];
iov&#91;i]&#91;1].iov_len = strlen(message_parts&#91;i]&#91;1]);

// 设置消息头
msgvec&#91;i].msg_hdr.msg_iov = iov&#91;i];
msgvec&#91;i].msg_hdr.msg_iovlen = 2;
msgvec&#91;i].msg_hdr.msg_name = &server_addr;
msgvec&#91;i].msg_hdr.msg_namelen = sizeof(server_addr);
}

printf("准备发送5个UDP消息(每个消息使用分散缓冲区)...\n");

// 批量发送UDP消息
messages_sent = sendmmsg(client_fd, msgvec, 5, 0);

if (messages_sent == -1) {
perror("sendmmsg 失败");
close(client_fd);
return -1;
}

printf("成功发送 %d 个UDP消息:\n", messages_sent);

// 显示发送统计
size_t total_bytes = 0;
for (int i = 0; i < messages_sent; i++) {
printf(" 消息 %d: %u 字节\n", i + 1, msgvec&#91;i].msg_len);
total_bytes += msgvec&#91;i].msg_len;
}

printf("总发送字节数: %zu\n", total_bytes);

close(client_fd);

return 0;
}

/**
* UDP服务器用于接收批量消息
*/
int udp_server_receive() {
int server_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer&#91;256];
ssize_t bytes_received;
int message_count = 0;

server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP服务器套接字失败");
return -1;
}

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8081);

if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定UDP套接字失败");
close(server_fd);
return -1;
}

printf("UDP服务器监听在端口 8081\n");
printf("等待接收消息(10秒超时)...\n");

// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 10;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));

// 接收消息
while (message_count < 10) {
bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&client_addr, &client_len);
if (bytes_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("接收超时\n");
break;
}
perror("接收消息失败");
break;
}

buffer&#91;bytes_received] = '\0';
printf("接收到消息 %d: %s (来自 %s:%d)\n",
++message_count, buffer,
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
}

close(server_fd);
return 0;
}

int main() {
// 启动UDP服务器
if (fork() == 0) {
sleep(1); // 等待客户端准备
return udp_server_receive();
}

// 执行UDP批量发送
sleep(1);
int result = demo_udp_batch_send();

// 等待服务器结束
int status;
wait(&status);

return result;
}

示例3:高性能网络服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <time.h>

/**
* 高性能服务器结构
*/
typedef struct {
int server_fd;
int port;
struct pollfd *clients;
int max_clients;
int client_count;
unsigned long messages_sent;
unsigned long bytes_sent;
} high_perf_server_t;

/**
* 初始化高性能服务器
*/
int server_init(high_perf_server_t *server, int port, int max_clients) {
struct sockaddr_in server_addr;

memset(server, 0, sizeof(high_perf_server_t));
server->port = port;
server->max_clients = max_clients;
server->client_count = 0;
server->messages_sent = 0;
server->bytes_sent = 0;

// 分配客户端数组
server->clients = calloc(max_clients + 1, sizeof(struct pollfd));
if (!server->clients) {
perror("分配客户端数组失败");
return -1;
}

// 创建服务器套接字
server->server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server->server_fd == -1) {
perror("创建服务器套接字失败");
free(server->clients);
return -1;
}

// 设置套接字选项
int opt = 1;
if (setsockopt(server->server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
perror("设置套接字选项失败");
close(server->server_fd);
free(server->clients);
return -1;
}

// 设置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);

// 绑定套接字
if (bind(server->server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定套接字失败");
close(server->server_fd);
free(server->clients);
return -1;
}

// 监听连接
if (listen(server->server_fd, 10) == -1) {
perror("监听失败");
close(server->server_fd);
free(server->clients);
return -1;
}

// 设置服务器套接字为poll监听
server->clients&#91;0].fd = server->server_fd;
server->clients&#91;0].events = POLLIN;

printf("高性能服务器初始化完成,监听端口 %d\n", port);
return 0;
}

/**
* 接受新客户端连接
*/
int server_accept_client(high_perf_server_t *server) {
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int client_fd;

client_fd = accept(server->server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
perror("接受连接失败");
return -1;
}

if (server->client_count >= server->max_clients) {
printf("客户端数量已达上限,拒绝连接\n");
close(client_fd);
return -1;
}

// 添加到客户端数组
int index = server->client_count + 1;
server->clients&#91;index].fd = client_fd;
server->clients&#91;index].events = POLLOUT; // 准备发送数据
server->client_count++;

printf("新客户端连接: %s:%d (fd=%d)\n",
inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), client_fd);

return 0;
}

/**
* 使用sendmmsg向客户端批量发送消息
*/
int server_send_batch_messages(high_perf_server_t *server, int client_index) {
int client_fd = server->clients&#91;client_index].fd;
const int BATCH_SIZE = 8; // 每次批量发送8个消息
struct mmsghdr msgvec&#91;BATCH_SIZE];
struct iovec iov&#91;BATCH_SIZE]&#91;1];
char messages&#91;BATCH_SIZE]&#91;128];
int messages_to_send = 0;

// 准备批量发送消息
memset(msgvec, 0, sizeof(msgvec));

for (int i = 0; i < BATCH_SIZE; i++) {
// 构造消息内容
snprintf(messages&#91;i], sizeof(messages&#91;i]),
"Server Message %lu: Batch %d, Index %d",
server->messages_sent + i + 1,
(int)(server->messages_sent / BATCH_SIZE) + 1, i + 1);

// 设置缓冲区
iov&#91;i]&#91;0].iov_base = messages&#91;i];
iov&#91;i]&#91;0].iov_len = strlen(messages&#91;i]);

// 设置消息头
msgvec&#91;i].msg_hdr.msg_iov = iov&#91;i];
msgvec&#91;i].msg_hdr.msg_iovlen = 1;
msgvec&#91;i].msg_hdr.msg_name = NULL;
msgvec&#91;i].msg_hdr.msg_namelen = 0;

messages_to_send++;
}

// 批量发送消息
int messages_sent = sendmmsg(client_fd, msgvec, messages_to_send, MSG_NOSIGNAL);

if (messages_sent == -1) {
if (errno == EPIPE || errno == ECONNRESET) {
printf("客户端 %d 连接断开\n", client_fd);
return -1;
}
perror("sendmmsg 失败");
return -1;
}

// 更新统计信息
server->messages_sent += messages_sent;
for (int i = 0; i < messages_sent; i++) {
server->bytes_sent += msgvec&#91;i].msg_len;
}

printf("向客户端 %d 批量发送 %d 个消息\n", client_fd, messages_sent);

return 0;
}

/**
* 运行服务器主循环
*/
int server_run(high_perf_server_t *server) {
printf("服务器开始运行,等待客户端连接...\n");

time_t start_time = time(NULL);

while (difftime(time(NULL), start_time) < 30) { // 运行30秒
// 使用poll等待事件
int nfds = server->client_count + 1;
int activity = poll(server->clients, nfds, 1000); // 1秒超时

if (activity == -1) {
if (errno == EINTR) continue; // 被信号中断
perror("poll 失败");
break;
}

if (activity == 0) {
// 超时,继续循环
continue;
}

// 检查服务器套接字(新连接)
if (server->clients&#91;0].revents & POLLIN) {
server_accept_client(server);
activity--;
}

// 检查客户端套接字(准备发送数据)
for (int i = 1; i <= server->client_count && activity > 0; i++) {
if (server->clients&#91;i].revents & POLLOUT) {
if (server_send_batch_messages(server, i) == -1) {
// 客户端断开连接,移除客户端
close(server->clients&#91;i].fd);
// 将最后一个客户端移到当前位置
if (i < server->client_count) {
server->clients&#91;i] = server->clients&#91;server->client_count];
}
server->client_count--;
i--; // 重新检查当前位置
}
activity--;
}
}
}

return 0;
}

/**
* 清理服务器资源
*/
void server_cleanup(high_perf_server_t *server) {
// 关闭所有客户端连接
for (int i = 1; i <= server->client_count; i++) {
close(server->clients&#91;i].fd);
}

// 关闭服务器套接字
if (server->server_fd != -1) {
close(server->server_fd);
}

// 释放内存
if (server->clients) {
free(server->clients);
}

printf("服务器资源清理完成\n");
printf("统计信息:\n");
printf(" 总发送消息数: %lu\n", server->messages_sent);
printf(" 总发送字节数: %lu\n", server->bytes_sent);
if (server->messages_sent > 0) {
printf(" 平均消息大小: %.2f 字节\n",
(double)server->bytes_sent / server->messages_sent);
}
}

/**
* 演示高性能网络服务器
*/
int demo_high_performance_server() {
high_perf_server_t server;

printf("=== 高性能网络服务器示例 ===\n");

// 初始化服务器
if (server_init(&server, 8082, 10) != 0) {
return -1;
}

// 启动测试客户端
if (fork() == 0) {
sleep(2); // 等待服务器启动

// 创建多个并发客户端
for (int i = 0; i < 3; i++) {
if (fork() == 0) {
int client_sock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr;

memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(8082);
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

if (connect(client_sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == 0) {
printf("客户端 %d 连接成功\n", i + 1);

// 接收服务器发送的消息
char buffer&#91;256];
for (int msg = 0; msg < 20; msg++) {
ssize_t bytes = recv(client_sock, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
buffer&#91;bytes] = '\0';
printf("客户端 %d 接收消息: %s\n", i + 1, buffer);
} else if (bytes == 0) {
printf("客户端 %d 连接关闭\n", i + 1);
break;
}
}
}

close(client_sock);
exit(0);
}
}

// 等待所有客户端完成
for (int i = 0; i < 3; i++) {
int status;
wait(&status);
}

exit(0);
}

// 运行服务器
server_run(&server);

// 清理资源
server_cleanup(&server);

// 等待测试客户端结束
int status;
wait(&status);

return 0;
}

int main() {
return demo_high_performance_server();
}

示例4:实时数据流发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <signal.h>

/**
* 实时数据流结构
*/
typedef struct {
int sockfd;
struct sockaddr_in dest_addr;
unsigned long packets_sent;
unsigned long bytes_sent;
time_t start_time;
volatile int running;
} data_stream_t;

// 全局变量用于信号处理
static data_stream_t *g_stream = NULL;

/**
* 信号处理函数
*/
void signal_handler(int sig) {
if (g_stream) {
g_stream->running = 0;
printf("\n收到信号 %d,准备停止数据流...\n", sig);
}
}

/**
* 初始化数据流发送器
*/
int stream_init(data_stream_t *stream, const char *dest_ip, int dest_port) {
memset(stream, 0, sizeof(data_stream_t));
stream->start_time = time(NULL);
stream->running = 1;

// 设置全局指针用于信号处理
g_stream = stream;

// 注册信号处理
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);

// 创建UDP套接字
stream->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (stream->sockfd == -1) {
perror("创建UDP套接字失败");
return -1;
}

// 设置目标地址
memset(&stream->dest_addr, 0, sizeof(stream->dest_addr));
stream->dest_addr.sin_family = AF_INET;
stream->dest_addr.sin_port = htons(dest_port);
stream->dest_addr.sin_addr.s_addr = inet_addr(dest_ip);

printf("数据流发送器初始化完成\n");
printf("目标地址: %s:%d\n", dest_ip, dest_port);

return 0;
}

/**
* 发送实时数据包批次
*/
int stream_send_batch(data_stream_t *stream) {
const int BATCH_SIZE = 16; // 每次发送16个数据包
struct mmsghdr msgvec&#91;BATCH_SIZE];
struct iovec iov&#91;BATCH_SIZE]&#91;1];
char packets&#91;BATCH_SIZE]&#91;256];
struct timespec ts;

// 准备批量发送结构
memset(msgvec, 0, sizeof(msgvec));

clock_gettime(CLOCK_REALTIME, &ts);

for (int i = 0; i < BATCH_SIZE; i++) {
// 构造实时数据包
snprintf(packets&#91;i], sizeof(packets&#91;i]),
"REALTIME_DATA:%lu.%09ld:Packet_%lu:Value_%d",
ts.tv_sec, ts.tv_nsec,
stream->packets_sent + i + 1,
rand() % 1000);

// 设置缓冲区
iov&#91;i]&#91;0].iov_base = packets&#91;i];
iov&#91;i]&#91;0].iov_len = strlen(packets&#91;i]);

// 设置消息头
msgvec&#91;i].msg_hdr.msg_iov = iov&#91;i];
msgvec&#91;i].msg_hdr.msg_iovlen = 1;
msgvec&#91;i].msg_hdr.msg_name = &stream->dest_addr;
msgvec&#91;i].msg_hdr.msg_namelen = sizeof(stream->dest_addr);
}

// 批量发送数据包
int packets_sent = sendmmsg(stream->sockfd, msgvec, BATCH_SIZE, 0);

if (packets_sent == -1) {
perror("sendmmsg 发送数据包失败");
return -1;
}

// 更新统计信息
stream->packets_sent += packets_sent;
for (int i = 0; i < packets_sent; i++) {
stream->bytes_sent += msgvec&#91;i].msg_len;
}

return packets_sent;
}

/**
* 显示实时统计信息
*/
void stream_show_stats(data_stream_t *stream) {
time_t current_time = time(NULL);
double uptime = difftime(current_time, stream->start_time);

if (uptime > 0) {
double packets_per_sec = stream->packets_sent / uptime;
double bytes_per_sec = stream->bytes_sent / uptime;

printf("\r运行时间: %.0fs | 数据包: %lu | 字节: %lu | "
"速率: %.0f包/s %.2fMB/s",
uptime, stream->packets_sent, stream->bytes_sent,
packets_per_sec, bytes_per_sec / (1024 * 1024));
fflush(stdout);
}
}

/**
* 运行数据流发送器
*/
int stream_run(data_stream_t *stream) {
printf("数据流发送器开始运行,按 Ctrl+C 停止\n");

time_t last_stats_time = time(NULL);
int batch_count = 0;

while (stream->running) {
int result = stream_send_batch(stream);
if (result == -1) {
break;
}

batch_count++;

// 定期显示统计信息
time_t current_time = time(NULL);
if (current_time - last_stats_time >= 1) {
stream_show_stats(stream);
last_stats_time = current_time;
}

// 控制发送速率(每秒约1000个数据包)
if (batch_count % 64 == 0) {
usleep(10000); // 10ms延迟
}
}

printf("\n数据流发送器停止\n");
return 0;
}

/**
* 演示实时数据流发送
*/
int demo_real_time_data_stream() {
data_stream_t stream;

printf("=== 实时数据流发送示例 ===\n");

// 初始化数据流发送器
if (stream_init(&stream, "127.0.0.1", 8083) != 0) {
return -1;
}

// 启动数据接收器
if (fork() == 0) {
int server_fd;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer&#91;512];
ssize_t bytes_received;
int packet_count = 0;

server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建接收器套接字失败");
exit(1);
}

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(8083);

if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定接收器套接字失败");
close(server_fd);
exit(1);
}

printf("数据接收器启动,监听端口 8083\n");

// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));

// 接收数据包
while (packet_count < 1000) {
bytes_received = recvfrom(server_fd, buffer, sizeof(buffer) - 1, 0,
(struct sockaddr*)&client_addr, &client_len);
if (bytes_received == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("接收超时\n");
break;
}
perror("接收数据包失败");
break;
}

buffer&#91;bytes_received] = '\0';
packet_count++;

if (packet_count % 100 == 0) {
printf("接收器: 接收到 %d 个数据包\n", packet_count);
}
}

printf("接收器: 总共接收到 %d 个数据包\n", packet_count);
close(server_fd);
exit(0);
}

// 运行数据流发送器
srand(time(NULL));
stream_run(&stream);

// 显示最终统计
printf("\n\n最终统计:\n");
printf(" 总发送数据包: %lu\n", stream.packets_sent);
printf(" 总发送字节数: %lu\n", stream.bytes_sent);
printf(" 平均包大小: %.2f 字节\n",
stream.packets_sent > 0 ? (double)stream.bytes_sent / stream.packets_sent : 0);

// 清理资源
close(stream.sockfd);

// 等待接收器结束
int status;
wait(&status);

return 0;
}

int main() {
return demo_real_time_data_stream();
}

示例5:性能对比测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#define _GNU_SOURCE
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>

/**
* 性能测试结构
*/
typedef struct {
const char *name;
unsigned long messages_sent;
unsigned long bytes_sent;
struct timeval start_time;
struct timeval end_time;
} perf_test_t;

/**
* 使用传统sendmsg进行测试
*/
int test_sendmsg_performance(int sockfd, struct sockaddr_in *dest_addr,
int message_count, const char *message) {
struct msghdr msg;
struct iovec iov&#91;1];
int sent_count = 0;

// 准备发送结构
memset(&msg, 0, sizeof(msg));
iov&#91;0].iov_base = (void*)message;
iov&#91;0].iov_len = strlen(message);

msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_name = dest_addr;
msg.msg_namelen = sizeof(*dest_addr);

// 逐个发送消息
while (sent_count < message_count) {
if (sendmsg(sockfd, &msg, 0) == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("sendmsg 失败");
break;
}
} else {
sent_count++;
}
}

return sent_count;
}

/**
* 使用sendmmsg进行测试
*/
int test_sendmmsg_performance(int sockfd, struct sockaddr_in *dest_addr,
int message_count, const char *message) {
const int BATCH_SIZE = 32;
struct mmsghdr msgvec&#91;BATCH_SIZE];
struct iovec iov&#91;BATCH_SIZE]&#91;1];
char *messages&#91;BATCH_SIZE];
int total_sent = 0;
int messages_sent;

// 准备批量发送结构
memset(msgvec, 0, sizeof(msgvec));

for (int i = 0; i < BATCH_SIZE; i++) {
messages&#91;i] = strdup(message);
if (!messages&#91;i]) {
perror("分配消息缓冲区失败");
return total_sent;
}

iov&#91;i]&#91;0].iov_base = messages&#91;i];
iov&#91;i]&#91;0].iov_len = strlen(message);

msgvec&#91;i].msg_hdr.msg_iov = iov&#91;i];
msgvec&#91;i].msg_hdr.msg_iovlen = 1;
msgvec&#91;i].msg_hdr.msg_name = dest_addr;
msgvec&#91;i].msg_hdr.msg_namelen = sizeof(*dest_addr);
}

// 批量发送消息
while (total_sent < message_count) {
int to_send = (message_count - total_sent < BATCH_SIZE) ?
message_count - total_sent : BATCH_SIZE;

messages_sent = sendmmsg(sockfd, msgvec, to_send, 0);

if (messages_sent == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("sendmmsg 失败");
break;
}
} else {
total_sent += messages_sent;
}
}

// 释放内存
for (int i = 0; i < BATCH_SIZE; i++) {
free(messages&#91;i]);
}

return total_sent;
}

/**
* UDP性能测试服务器
*/
int perf_test_server(int port) {
int server_fd;
struct sockaddr_in server_addr;

server_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (server_fd == -1) {
perror("创建UDP服务器套接字失败");
return -1;
}

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);

if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("绑定服务器套接字失败");
close(server_fd);
return -1;
}

printf("性能测试服务器启动,监听端口 %d\n", port);
return server_fd;
}

/**
* 演示性能对比测试
*/
int demo_performance_comparison() {
int server_fd, client_fd;
struct sockaddr_in server_addr, dest_addr;
const int SERVER_PORT = 8084;
const int MESSAGE_COUNT = 10000;
const char *test_message = "Performance test message for sendmmsg vs sendmsg comparison";
perf_test_t tests&#91;2];

printf("=== sendmmsg vs sendmsg 性能对比测试 ===\n");

// 初始化测试结构
tests&#91;0].name = "sendmsg";
tests&#91;0].messages_sent = 0;
tests&#91;0].bytes_sent = 0;

tests&#91;1].name = "sendmmsg";
tests&#91;1].messages_sent = 0;
tests&#91;1].bytes_sent = 0;

// 启动服务器
server_fd = perf_test_server(SERVER_PORT);
if (server_fd == -1) {
return -1;
}

// 创建客户端套接字
client_fd = socket(AF_INET, SOCK_DGRAM, 0);
if (client_fd == -1) {
perror("创建客户端套接字失败");
close(server_fd);
return -1;
}

// 设置目标地址
memset(&dest_addr, 0, sizeof(dest_addr));
dest_addr.sin_family = AF_INET;
dest_addr.sin_port = htons(SERVER_PORT);
dest_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

// 启动服务器接收线程
if (fork() == 0) {
char buffer&#91;512];
int received_count = 0;

printf("服务器开始接收测试消息...\n");

// 设置接收超时
struct timeval timeout;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
setsockopt(server_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));

while (received_count < MESSAGE_COUNT * 2) { // 两次测试
ssize_t bytes = recv(server_fd, buffer, sizeof(buffer) - 1, 0);
if (bytes > 0) {
received_count++;
if (received_count % 1000 == 0) {
printf("服务器已接收 %d 个消息\n", received_count);
}
} else if (bytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
printf("服务器接收超时\n");
break;
}
}
}

printf("服务器接收完成,总共接收 %d 个消息\n", received_count);
close(server_fd);
exit(0);
}

// 等待服务器启动
sleep(1);

// 测试1: 使用sendmsg
printf("\n测试1: 使用传统sendmsg发送 %d 个消息...\n", MESSAGE_COUNT);
gettimeofday(&tests&#91;0].start_time, NULL);

tests&#91;0].messages_sent = test_sendmsg_performance(client_fd, &dest_addr,
MESSAGE_COUNT, test_message);
tests&#91;0].bytes_sent = tests&#91;0].messages_sent * strlen(test_message);

gettimeofday(&tests&#91;0].end_time, NULL);

printf("sendmsg测试完成: 发送 %lu 个消息\n", tests&#91;0].messages_sent);

// 短暂休息
sleep(1);

// 测试2: 使用sendmmsg
printf("\n测试2: 使用sendmmsg发送 %d 个消息...\n", MESSAGE_COUNT);
gettimeofday(&tests&#91;1].start_time, NULL);

tests&#91;1].messages_sent = test_sendmmsg_performance(client_fd, &dest_addr,
MESSAGE_COUNT, test_message);
tests&#91;1].bytes_sent = tests&#91;1].messages_sent * strlen(test_message);

gettimeofday(&tests&#91;1].end_time, NULL);

printf("sendmmsg测试完成: 发送 %lu 个消息\n", tests&#91;1].messages_sent);

// 计算并显示结果
printf("\n=== 性能测试结果 ===\n");

for (int i = 0; i < 2; i++) {
double elapsed_time = (tests&#91;i].end_time.tv_sec - tests&#91;i].start_time.tv_sec) +
(tests&#91;i].end_time.tv_usec - tests&#91;i].start_time.tv_usec) / 1000000.0;

double messages_per_sec = tests&#91;i].messages_sent / elapsed_time;
double bytes_per_sec = tests&#91;i].bytes_sent / elapsed_time;

printf("%s 测试:\n", tests&#91;i].name);
printf(" 发送消息数: %lu\n", tests&#91;i].messages_sent);
printf(" 发送字节数: %lu\n", tests&#91;i].bytes_sent);
printf(" 耗时: %.3f 秒\n", elapsed_time);
printf(" 吞吐量: %.0f 消息/秒 (%.2f MB/s)\n",
messages_per_sec, bytes_per_sec / (1024 * 1024));
printf(" 平均延迟: %.3f 微秒/消息\n",
(elapsed_time * 1000000) / tests&#91;i].messages_sent);
printf("\n");
}

// 计算性能提升
double sendmsg_time = (tests&#91;0].end_time.tv_sec - tests&#91;0].start_time.tv_sec) +
(tests&#91;0].end_time.tv_usec - tests&#91;0].start_time.tv_usec) / 1000000.0;
double sendmmsg_time = (tests&#91;1].end_time.tv_sec - tests&#91;1].start_time.tv_sec) +
(tests&#91;1].end_time.tv_usec - tests&#91;1].start_time.tv_usec) / 1000000.0;

if (sendmsg_time > 0 && sendmmsg_time > 0) {
double improvement = (sendmsg_time - sendmmsg_time) / sendmsg_time * 100;
printf("性能提升: %.1f%%\n", improvement);
}

close(client_fd);

// 等待服务器结束
int status;
wait(&status);

return 0;
}

int main() {
return demo_performance_comparison();
}

sendmmsg 标志参数详解

常用标志:

  • MSG_CONFIRM: 提供路径确认反馈

  • MSG_DONTROUTE: 不使用网关路由

  • MSG_DONTWAIT: 非阻塞操作

  • MSG_EOR: 发送记录结束标记

  • MSG_MORE: 还有更多数据要发送

  • MSG_NOSIGNAL: 发送时不产生SIGPIPE信号

  • MSG_OOB: 发送带外数据

高级标志:

  • MSG_PROXY: SOCKS代理相关

  • MSG_TRYHARD: 尽力发送(已废弃)

使用注意事项

系统要求:

内核版本: 需要Linux 2.6.39或更高版本

data-ad-format="fluid" data-ad-layout-key="-7k+ex-4a-9w+4a">

glibc版本: 需要支持sendmmsg的glibc版本

编译选项: 需要定义_GNU_SOURCE

性能优化:

批量大小: 根据应用需求选择合适的批量大小

缓冲区管理: 合理分配缓冲区避免内存浪费

错误处理: 妥善处理部分发送的情况

错误处理:

部分发送: 处理实际发送消息数少于请求的情况

连接状态: 检查连接是否正常

资源清理: 及时关闭套接字和释放内存

安全考虑:

缓冲区溢出: 确保消息缓冲区大小正确

输入验证: 验证发送的数据内容

权限检查: 确保有适当的网络访问权限

sendmmsg 优势

1. 性能优势:

  • 减少系统调用: 单次调用发送多个消息

  • 降低上下文切换: 减少用户态和内核态切换

  • 提高吞吐量: 特别适用于小消息高频发送场景

2. 功能优势:

  • 完整功能: 支持sendmsg的所有功能

  • 分散缓冲区: 每个消息可使用多个缓冲区

  • 控制信息: 支持发送辅助控制数据

3. 应用场景:

  • 高并发服务器: Web服务器、游戏服务器

  • 实时系统: 音视频流传输、传感器数据采集

  • 消息队列: 批量消息处理系统

  • 网络协议: 实现自定义网络协议

总结

sendmmsg 是构建高性能网络应用的重要工具,它提供了:

  • 批量消息发送能力,显著减少系统调用开销

  • 与sendmsg相同的完整功能集

  • 更好的性能和可扩展性

  • 适用于高吞吐量的实时应用

通过合理使用 sendmmsg,可以大幅提升网络应用的性能,特别是在需要发送大量小消息的场景中效果显著。在实际应用中,需要注意批量大小的选择、错误处理和系统兼容性等问题。

data-ad-format="auto" data-full-width-responsive="true">