blob: 244568e558948360674d7c14184a504278f37b28 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <time.h>
29#ifdef POSIX
30#include <netinet/in.h>
31#endif
32#include <cmath>
33
34#include "talk/base/logging.h"
35#include "talk/base/gunit.h"
36#include "talk/base/testclient.h"
37#include "talk/base/testutils.h"
38#include "talk/base/thread.h"
39#include "talk/base/timeutils.h"
40#include "talk/base/virtualsocketserver.h"
41
42using namespace talk_base;
43
44// Sends at a constant rate but with random packet sizes.
45struct Sender : public MessageHandler {
46 Sender(Thread* th, AsyncSocket* s, uint32 rt)
47 : thread(th), socket(new AsyncUDPSocket(s)),
48 done(false), rate(rt), count(0) {
49 last_send = Time();
50 thread->PostDelayed(NextDelay(), this, 1);
51 }
52
53 uint32 NextDelay() {
54 uint32 size = (rand() % 4096) + 1;
55 return 1000 * size / rate;
56 }
57
58 void OnMessage(Message* pmsg) {
59 ASSERT_EQ(1u, pmsg->message_id);
60
61 if (done)
62 return;
63
64 uint32 cur_time = Time();
65 uint32 delay = cur_time - last_send;
66 uint32 size = rate * delay / 1000;
67 size = std::min<uint32>(size, 4096);
68 size = std::max<uint32>(size, sizeof(uint32));
69
70 count += size;
71 memcpy(dummy, &cur_time, sizeof(cur_time));
72 socket->Send(dummy, size);
73
74 last_send = cur_time;
75 thread->PostDelayed(NextDelay(), this, 1);
76 }
77
78 Thread* thread;
79 scoped_ptr<AsyncUDPSocket> socket;
80 bool done;
81 uint32 rate; // bytes per second
82 uint32 count;
83 uint32 last_send;
84 char dummy[4096];
85};
86
87struct Receiver : public MessageHandler, public sigslot::has_slots<> {
88 Receiver(Thread* th, AsyncSocket* s, uint32 bw)
89 : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
90 count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
91 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
92 thread->PostDelayed(1000, this, 1);
93 }
94
95 ~Receiver() {
96 thread->Clear(this);
97 }
98
99 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
100 const SocketAddress& remote_addr) {
101 ASSERT_EQ(socket.get(), s);
102 ASSERT_GE(size, 4U);
103
104 count += size;
105 sec_count += size;
106
107 uint32 send_time = *reinterpret_cast<const uint32*>(data);
108 uint32 recv_time = Time();
109 uint32 delay = recv_time - send_time;
110 sum += delay;
111 sum_sq += delay * delay;
112 samples += 1;
113 }
114
115 void OnMessage(Message* pmsg) {
116 ASSERT_EQ(1u, pmsg->message_id);
117
118 if (done)
119 return;
120
121 // It is always possible for us to receive more than expected because
122 // packets can be further delayed in delivery.
123 if (bandwidth > 0)
124 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
125 sec_count = 0;
126 thread->PostDelayed(1000, this, 1);
127 }
128
129 Thread* thread;
130 scoped_ptr<AsyncUDPSocket> socket;
131 uint32 bandwidth;
132 bool done;
133 size_t count;
134 size_t sec_count;
135 double sum;
136 double sum_sq;
137 uint32 samples;
138};
139
140class VirtualSocketServerTest : public testing::Test {
141 public:
142 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
143 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
144 kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
145 }
146
147 void CheckAddressIncrementalization(const SocketAddress& post,
148 const SocketAddress& pre) {
149 EXPECT_EQ(post.port(), pre.port() + 1);
150 IPAddress post_ip = post.ipaddr();
151 IPAddress pre_ip = pre.ipaddr();
152 EXPECT_EQ(pre_ip.family(), post_ip.family());
153 if (post_ip.family() == AF_INET) {
154 in_addr pre_ipv4 = pre_ip.ipv4_address();
155 in_addr post_ipv4 = post_ip.ipv4_address();
156 int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
157 EXPECT_EQ(1, difference);
158 } else if (post_ip.family() == AF_INET6) {
159 in6_addr post_ip6 = post_ip.ipv6_address();
160 in6_addr pre_ip6 = pre_ip.ipv6_address();
161 uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
162 uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
163 EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
164 }
165 }
166
167 void BasicTest(const SocketAddress& initial_addr) {
168 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
169 SOCK_DGRAM);
170 socket->Bind(initial_addr);
171 SocketAddress server_addr = socket->GetLocalAddress();
172 // Make sure VSS didn't switch families on us.
173 EXPECT_EQ(server_addr.family(), initial_addr.family());
174
175 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
176 AsyncSocket* socket2 =
177 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
178 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
179
180 SocketAddress client2_addr;
181 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
182 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
183
184 SocketAddress client1_addr;
185 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
186 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
187 EXPECT_EQ(client1_addr, server_addr);
188
189 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
190 for (int i = 0; i < 10; i++) {
191 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
192
193 SocketAddress next_client2_addr;
194 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
195 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
196 CheckAddressIncrementalization(next_client2_addr, client2_addr);
197 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
198
199 SocketAddress server_addr2;
200 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
201 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
202 EXPECT_EQ(server_addr2, server_addr);
203
204 client2_addr = next_client2_addr;
205 }
206 }
207
208 // initial_addr should be made from either INADDR_ANY or in6addr_any.
209 void ConnectTest(const SocketAddress& initial_addr) {
210 testing::StreamSink sink;
211 SocketAddress accept_addr;
212 const SocketAddress kEmptyAddr =
213 EmptySocketAddressWithFamily(initial_addr.family());
214
215 // Create client
216 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
217 SOCK_STREAM);
218 sink.Monitor(client);
219 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
220 EXPECT_TRUE(client->GetLocalAddress().IsNil());
221
222 // Create server
223 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
224 SOCK_STREAM);
225 sink.Monitor(server);
226 EXPECT_NE(0, server->Listen(5)); // Bind required
227 EXPECT_EQ(0, server->Bind(initial_addr));
228 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
229 EXPECT_EQ(0, server->Listen(5));
230 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
231
232 // No pending server connections
233 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
234 EXPECT_TRUE(NULL == server->Accept(&accept_addr));
235 EXPECT_EQ(AF_UNSPEC, accept_addr.family());
236
237 // Attempt connect to listening socket
238 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
239 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind
240 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind
241 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
242
243 // Client is connecting
244 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
245 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
246 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
247
248 ss_->ProcessMessagesUntilIdle();
249
250 // Client still connecting
251 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
252 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
253 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
254
255 // Server has pending connection
256 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
257 Socket* accepted = server->Accept(&accept_addr);
258 EXPECT_TRUE(NULL != accepted);
259 EXPECT_NE(accept_addr, kEmptyAddr);
260 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
261
262 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
263 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
264 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
265
266 ss_->ProcessMessagesUntilIdle();
267
268 // Client has connected
269 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
270 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
271 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
272 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
273 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
274 }
275
276 void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
277 testing::StreamSink sink;
278 SocketAddress accept_addr;
279 const SocketAddress nil_addr;
280 const SocketAddress empty_addr =
281 EmptySocketAddressWithFamily(initial_addr.family());
282
283 // Create client
284 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
285 SOCK_STREAM);
286 sink.Monitor(client);
287
288 // Create server
289 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
290 SOCK_STREAM);
291 sink.Monitor(server);
292 EXPECT_EQ(0, server->Bind(initial_addr));
293 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
294 // Attempt connect to non-listening socket
295 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
296
297 ss_->ProcessMessagesUntilIdle();
298
299 // No pending server connections
300 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
301 EXPECT_TRUE(NULL == server->Accept(&accept_addr));
302 EXPECT_EQ(accept_addr, nil_addr);
303
304 // Connection failed
305 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
306 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
307 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
308 EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
309 }
310
311 void CloseDuringConnectTest(const SocketAddress& initial_addr) {
312 testing::StreamSink sink;
313 SocketAddress accept_addr;
314 const SocketAddress empty_addr =
315 EmptySocketAddressWithFamily(initial_addr.family());
316
317 // Create client and server
318 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
319 SOCK_STREAM);
320 sink.Monitor(client);
321 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
322 SOCK_STREAM);
323 sink.Monitor(server);
324
325 // Initiate connect
326 EXPECT_EQ(0, server->Bind(initial_addr));
327 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
328
329 EXPECT_EQ(0, server->Listen(5));
330 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
331
332 // Server close before socket enters accept queue
333 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
334 server->Close();
335
336 ss_->ProcessMessagesUntilIdle();
337
338 // Result: connection failed
339 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
340 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
341
342 // New server
343 delete server;
344 server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
345 sink.Monitor(server);
346
347 // Initiate connect
348 EXPECT_EQ(0, server->Bind(initial_addr));
349 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
350
351 EXPECT_EQ(0, server->Listen(5));
352 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
353
354 ss_->ProcessMessagesUntilIdle();
355
356 // Server close while socket is in accept queue
357 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
358 server->Close();
359
360 ss_->ProcessMessagesUntilIdle();
361
362 // Result: connection failed
363 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
364 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
365
366 // New server
367 delete server;
368 server = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
369 sink.Monitor(server);
370
371 // Initiate connect
372 EXPECT_EQ(0, server->Bind(initial_addr));
373 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
374
375 EXPECT_EQ(0, server->Listen(5));
376 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
377
378 ss_->ProcessMessagesUntilIdle();
379
380 // Server accepts connection
381 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
382 AsyncSocket* accepted = server->Accept(&accept_addr);
383 ASSERT_TRUE(NULL != accepted);
384 sink.Monitor(accepted);
385
386 // Client closes before connection complets
387 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
388
389 // Connected message has not been processed yet.
390 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
391 client->Close();
392
393 ss_->ProcessMessagesUntilIdle();
394
395 // Result: accepted socket closes
396 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
397 EXPECT_TRUE(sink.Check(accepted, testing::SSE_CLOSE));
398 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
399 }
400
401 void CloseTest(const SocketAddress& initial_addr) {
402 testing::StreamSink sink;
403 const SocketAddress kEmptyAddr;
404
405 // Create clients
406 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
407 sink.Monitor(a);
408 a->Bind(initial_addr);
409 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
410
411
412 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
413 sink.Monitor(b);
414 b->Bind(initial_addr);
415 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
416
417 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
418 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
419
420 ss_->ProcessMessagesUntilIdle();
421
422 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
423 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
424 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
425
426 EXPECT_TRUE(sink.Check(b, testing::SSE_OPEN));
427 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
428 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
429
430 EXPECT_EQ(1, a->Send("a", 1));
431 b->Close();
432 EXPECT_EQ(1, a->Send("b", 1));
433
434 ss_->ProcessMessagesUntilIdle();
435
436 char buffer[10];
437 EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
438 EXPECT_EQ(-1, b->Recv(buffer, 10));
439
440 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
441 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
442 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
443
444 EXPECT_FALSE(sink.Check(b, testing::SSE_CLOSE)); // No signal for Closer
445 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
446 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
447 }
448
449 void TcpSendTest(const SocketAddress& initial_addr) {
450 testing::StreamSink sink;
451 const SocketAddress kEmptyAddr;
452
453 // Connect two sockets
454 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
455 sink.Monitor(a);
456 a->Bind(initial_addr);
457 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
458
459 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
460 sink.Monitor(b);
461 b->Bind(initial_addr);
462 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
463
464 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
465 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
466
467 ss_->ProcessMessagesUntilIdle();
468
469 const size_t kBufferSize = 2000;
470 ss_->set_send_buffer_capacity(kBufferSize);
471 ss_->set_recv_buffer_capacity(kBufferSize);
472
473 const size_t kDataSize = 5000;
474 char send_buffer[kDataSize], recv_buffer[kDataSize];
475 for (size_t i = 0; i < kDataSize; ++i)
476 send_buffer[i] = static_cast<char>(i % 256);
477 memset(recv_buffer, 0, sizeof(recv_buffer));
478 size_t send_pos = 0, recv_pos = 0;
479
480 // Can't send more than send buffer in one write
481 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
482 EXPECT_EQ(static_cast<int>(kBufferSize), result);
483 send_pos += result;
484
485 ss_->ProcessMessagesUntilIdle();
486 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
487 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
488
489 // Receive buffer is already filled, fill send buffer again
490 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
491 EXPECT_EQ(static_cast<int>(kBufferSize), result);
492 send_pos += result;
493
494 ss_->ProcessMessagesUntilIdle();
495 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
496 EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
497
498 // No more room in send or receive buffer
499 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
500 EXPECT_EQ(-1, result);
501 EXPECT_TRUE(a->IsBlocking());
502
503 // Read a subset of the data
504 result = b->Recv(recv_buffer + recv_pos, 500);
505 EXPECT_EQ(500, result);
506 recv_pos += result;
507
508 ss_->ProcessMessagesUntilIdle();
509 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
510 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
511
512 // Room for more on the sending side
513 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
514 EXPECT_EQ(500, result);
515 send_pos += result;
516
517 // Empty the recv buffer
518 while (true) {
519 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
520 if (result < 0) {
521 EXPECT_EQ(-1, result);
522 EXPECT_TRUE(b->IsBlocking());
523 break;
524 }
525 recv_pos += result;
526 }
527
528 ss_->ProcessMessagesUntilIdle();
529 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
530
531 // Continue to empty the recv buffer
532 while (true) {
533 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
534 if (result < 0) {
535 EXPECT_EQ(-1, result);
536 EXPECT_TRUE(b->IsBlocking());
537 break;
538 }
539 recv_pos += result;
540 }
541
542 // Send last of the data
543 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
544 EXPECT_EQ(500, result);
545 send_pos += result;
546
547 ss_->ProcessMessagesUntilIdle();
548 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
549
550 // Receive the last of the data
551 while (true) {
552 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
553 if (result < 0) {
554 EXPECT_EQ(-1, result);
555 EXPECT_TRUE(b->IsBlocking());
556 break;
557 }
558 recv_pos += result;
559 }
560
561 ss_->ProcessMessagesUntilIdle();
562 EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
563
564 // The received data matches the sent data
565 EXPECT_EQ(kDataSize, send_pos);
566 EXPECT_EQ(kDataSize, recv_pos);
567 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
568 }
569
570 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
571 const SocketAddress kEmptyAddr;
572
573 // Connect two sockets
574 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
575 SOCK_STREAM);
576 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
577 SOCK_STREAM);
578 a->Bind(initial_addr);
579 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
580
581 b->Bind(initial_addr);
582 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
583
584 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
585 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
586 ss_->ProcessMessagesUntilIdle();
587
588 // First, deliver all packets in 0 ms.
589 char buffer[2] = { 0, 0 };
590 const char cNumPackets = 10;
591 for (char i = 0; i < cNumPackets; ++i) {
592 buffer[0] = '0' + i;
593 EXPECT_EQ(1, a->Send(buffer, 1));
594 }
595
596 ss_->ProcessMessagesUntilIdle();
597
598 for (char i = 0; i < cNumPackets; ++i) {
599 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
600 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
601 }
602
603 // Next, deliver packets at random intervals
604 const uint32 mean = 50;
605 const uint32 stddev = 50;
606
607 ss_->set_delay_mean(mean);
608 ss_->set_delay_stddev(stddev);
609 ss_->UpdateDelayDistribution();
610
611 for (char i = 0; i < cNumPackets; ++i) {
612 buffer[0] = 'A' + i;
613 EXPECT_EQ(1, a->Send(buffer, 1));
614 }
615
616 ss_->ProcessMessagesUntilIdle();
617
618 for (char i = 0; i < cNumPackets; ++i) {
619 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
620 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
621 }
622 }
623
624 void BandwidthTest(const SocketAddress& initial_addr) {
625 AsyncSocket* send_socket =
626 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
627 AsyncSocket* recv_socket =
628 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
629 ASSERT_EQ(0, send_socket->Bind(initial_addr));
630 ASSERT_EQ(0, recv_socket->Bind(initial_addr));
631 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
632 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
633 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
634
635 uint32 bandwidth = 64 * 1024;
636 ss_->set_bandwidth(bandwidth);
637
638 Thread* pthMain = Thread::Current();
639 Sender sender(pthMain, send_socket, 80 * 1024);
640 Receiver receiver(pthMain, recv_socket, bandwidth);
641
642 pthMain->ProcessMessages(5000);
643 sender.done = true;
644 pthMain->ProcessMessages(5000);
645
646 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
647 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s
648
649 ss_->set_bandwidth(0);
650 }
651
652 void DelayTest(const SocketAddress& initial_addr) {
653 time_t seed = ::time(NULL);
654 LOG(LS_VERBOSE) << "seed = " << seed;
655 srand(static_cast<unsigned int>(seed));
656
657 const uint32 mean = 2000;
658 const uint32 stddev = 500;
659
660 ss_->set_delay_mean(mean);
661 ss_->set_delay_stddev(stddev);
662 ss_->UpdateDelayDistribution();
663
664 AsyncSocket* send_socket =
665 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
666 AsyncSocket* recv_socket =
667 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
668 ASSERT_EQ(0, send_socket->Bind(initial_addr));
669 ASSERT_EQ(0, recv_socket->Bind(initial_addr));
670 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
671 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
672 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
673
674 Thread* pthMain = Thread::Current();
675 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
676 // 1000 packets, which is necessary to get a good distribution.
677 Sender sender(pthMain, send_socket, 100 * 2 * 1024);
678 Receiver receiver(pthMain, recv_socket, 0);
679
680 pthMain->ProcessMessages(10000);
681 sender.done = receiver.done = true;
682 ss_->ProcessMessagesUntilIdle();
683
684 const double sample_mean = receiver.sum / receiver.samples;
685 double num =
686 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
687 double den = receiver.samples * (receiver.samples - 1);
688 const double sample_stddev = std::sqrt(num / den);
689 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
690
691 EXPECT_LE(500u, receiver.samples);
692 // We initially used a 0.1 fudge factor, but on the build machine, we
693 // have seen the value differ by as much as 0.13.
694 EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
695 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
696
697 ss_->set_delay_mean(0);
698 ss_->set_delay_stddev(0);
699 ss_->UpdateDelayDistribution();
700 }
701
702 // Test cross-family communication between a client bound to client_addr and a
703 // server bound to server_addr. shouldSucceed indicates if communication is
704 // expected to work or not.
705 void CrossFamilyConnectionTest(const SocketAddress& client_addr,
706 const SocketAddress& server_addr,
707 bool shouldSucceed) {
708 testing::StreamSink sink;
709 SocketAddress accept_address;
710 const SocketAddress kEmptyAddr;
711
712 // Client gets a IPv4 address
713 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
714 SOCK_STREAM);
715 sink.Monitor(client);
716 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
717 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
718 client->Bind(client_addr);
719
720 // Server gets a non-mapped non-any IPv6 address.
721 // IPv4 sockets should not be able to connect to this.
722 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
723 SOCK_STREAM);
724 sink.Monitor(server);
725 server->Bind(server_addr);
726 server->Listen(5);
727
728 if (shouldSucceed) {
729 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
730 ss_->ProcessMessagesUntilIdle();
731 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
732 Socket* accepted = server->Accept(&accept_address);
733 EXPECT_TRUE(NULL != accepted);
734 EXPECT_NE(kEmptyAddr, accept_address);
735 ss_->ProcessMessagesUntilIdle();
736 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
737 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
738 } else {
739 // Check that the connection failed.
740 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
741 ss_->ProcessMessagesUntilIdle();
742
743 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
744 EXPECT_TRUE(NULL == server->Accept(&accept_address));
745 EXPECT_EQ(accept_address, kEmptyAddr);
746 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
747 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
748 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
749 }
750 }
751
752 // Test cross-family datagram sending between a client bound to client_addr
753 // and a server bound to server_addr. shouldSucceed indicates if sending is
754 // expected to succed or not.
755 void CrossFamilyDatagramTest(const SocketAddress& client_addr,
756 const SocketAddress& server_addr,
757 bool shouldSucceed) {
758 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
759 socket->Bind(server_addr);
760 SocketAddress bound_server_addr = socket->GetLocalAddress();
761 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
762
763 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
764 socket2->Bind(client_addr);
765 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
766 SocketAddress client2_addr;
767
768 if (shouldSucceed) {
769 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
770 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
771 SocketAddress client1_addr;
772 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
773 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
774 EXPECT_EQ(client1_addr, bound_server_addr);
775 } else {
776 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
777 EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
778 }
779 }
780
781 protected:
782 virtual void SetUp() {
783 Thread::Current()->set_socketserver(ss_);
784 }
785 virtual void TearDown() {
786 Thread::Current()->set_socketserver(NULL);
787 }
788
789 VirtualSocketServer* ss_;
790 const SocketAddress kIPv4AnyAddress;
791 const SocketAddress kIPv6AnyAddress;
792};
793
794TEST_F(VirtualSocketServerTest, basic_v4) {
795 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
796 BasicTest(ipv4_test_addr);
797}
798
799TEST_F(VirtualSocketServerTest, basic_v6) {
800 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
801 BasicTest(ipv6_test_addr);
802}
803
804TEST_F(VirtualSocketServerTest, connect_v4) {
805 ConnectTest(kIPv4AnyAddress);
806}
807
808TEST_F(VirtualSocketServerTest, connect_v6) {
809 ConnectTest(kIPv6AnyAddress);
810}
811
812TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
813 ConnectToNonListenerTest(kIPv4AnyAddress);
814}
815
816TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
817 ConnectToNonListenerTest(kIPv6AnyAddress);
818}
819
820TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
821 CloseDuringConnectTest(kIPv4AnyAddress);
822}
823
824TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
825 CloseDuringConnectTest(kIPv6AnyAddress);
826}
827
828TEST_F(VirtualSocketServerTest, close_v4) {
829 CloseTest(kIPv4AnyAddress);
830}
831
832TEST_F(VirtualSocketServerTest, close_v6) {
833 CloseTest(kIPv6AnyAddress);
834}
835
836TEST_F(VirtualSocketServerTest, tcp_send_v4) {
837 TcpSendTest(kIPv4AnyAddress);
838}
839
840TEST_F(VirtualSocketServerTest, tcp_send_v6) {
841 TcpSendTest(kIPv6AnyAddress);
842}
843
844TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
845 TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
846}
847
848TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
849 TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
850}
851
852TEST_F(VirtualSocketServerTest, bandwidth_v4) {
853 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
854 BandwidthTest(ipv4_test_addr);
855}
856
857TEST_F(VirtualSocketServerTest, bandwidth_v6) {
858 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
859 BandwidthTest(ipv6_test_addr);
860}
861
862TEST_F(VirtualSocketServerTest, delay_v4) {
863 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
864 DelayTest(ipv4_test_addr);
865}
866
867TEST_F(VirtualSocketServerTest, delay_v6) {
868 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
869 DelayTest(ipv6_test_addr);
870}
871
872// Works, receiving socket sees 127.0.0.2.
873TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
874 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
875 SocketAddress("0.0.0.0", 5000),
876 true);
877}
878
879// Fails.
880TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
881 CrossFamilyConnectionTest(SocketAddress("::2", 0),
882 SocketAddress("0.0.0.0", 5000),
883 false);
884}
885
886// Fails.
887TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
888 CrossFamilyConnectionTest(SocketAddress("::2", 0),
889 SocketAddress("::ffff:127.0.0.1", 5000),
890 false);
891}
892
893// Works. receiving socket sees ::ffff:127.0.0.2.
894TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
895 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
896 SocketAddress("::", 5000),
897 true);
898}
899
900// Fails.
901TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
902 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
903 SocketAddress("::1", 5000),
904 false);
905}
906
907// Works. Receiving socket sees ::ffff:127.0.0.1.
908TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
909 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
910 SocketAddress("::ffff:127.0.0.2", 5000),
911 true);
912}
913
914// Works, receiving socket sees a result from GetNextIP.
915TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
916 CrossFamilyConnectionTest(SocketAddress("::", 0),
917 SocketAddress("0.0.0.0", 5000),
918 true);
919}
920
921// Works, receiving socket sees whatever GetNextIP gave the client.
922TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
923 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
924 SocketAddress("::", 5000),
925 true);
926}
927
928TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
929 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
930 SocketAddress("::", 5000),
931 true);
932}
933
934TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
935 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
936 SocketAddress("0.0.0.0", 5000),
937 true);
938}
939
940TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
941 CrossFamilyDatagramTest(SocketAddress("::2", 0),
942 SocketAddress("0.0.0.0", 5000),
943 false);
944}
945
946TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
947 CrossFamilyDatagramTest(SocketAddress("::2", 0),
948 SocketAddress("::ffff:127.0.0.1", 5000),
949 false);
950}
951
952TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
953 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
954 SocketAddress("::", 5000),
955 true);
956}
957
958TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
959 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
960 SocketAddress("::1", 5000),
961 false);
962}
963
964TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
965 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
966 SocketAddress("::ffff:127.0.0.2", 5000),
967 true);
968}
969
970TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
971 CrossFamilyDatagramTest(SocketAddress("::", 0),
972 SocketAddress("0.0.0.0", 5000),
973 true);
974}
975
976TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
977 const uint32 kTestMean[] = { 10, 100, 333, 1000 };
978 const double kTestDev[] = { 0.25, 0.1, 0.01 };
979 // TODO: The current code only works for 1000 data points or more.
980 const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
981 for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
982 for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
983 for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
984 ASSERT_LT(0u, kTestSamples[sidx]);
985 const uint32 kStdDev =
986 static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
987 VirtualSocketServer::Function* f =
988 VirtualSocketServer::CreateDistribution(kTestMean[midx],
989 kStdDev,
990 kTestSamples[sidx]);
991 ASSERT_TRUE(NULL != f);
992 ASSERT_EQ(kTestSamples[sidx], f->size());
993 double sum = 0;
994 for (uint32 i = 0; i < f->size(); ++i) {
995 sum += (*f)[i].second;
996 }
997 const double mean = sum / f->size();
998 double sum_sq_dev = 0;
999 for (uint32 i = 0; i < f->size(); ++i) {
1000 double dev = (*f)[i].second - mean;
1001 sum_sq_dev += dev * dev;
1002 }
1003 const double stddev = std::sqrt(sum_sq_dev / f->size());
1004 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
1005 << "M=" << kTestMean[midx]
1006 << " SD=" << kStdDev
1007 << " N=" << kTestSamples[sidx];
1008 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1009 << "M=" << kTestMean[midx]
1010 << " SD=" << kStdDev
1011 << " N=" << kTestSamples[sidx];
1012 delete f;
1013 }
1014 }
1015 }
1016}