blob: 617a57f297115f01af7388d64c1f3d1d171c3337 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <time.h>
29#ifdef POSIX
30#include <netinet/in.h>
31#endif
32#include <cmath>
33
34#include "talk/base/logging.h"
35#include "talk/base/gunit.h"
36#include "talk/base/testclient.h"
37#include "talk/base/testutils.h"
38#include "talk/base/thread.h"
39#include "talk/base/timeutils.h"
40#include "talk/base/virtualsocketserver.h"
41
42using namespace talk_base;
43
44// Sends at a constant rate but with random packet sizes.
45struct Sender : public MessageHandler {
46 Sender(Thread* th, AsyncSocket* s, uint32 rt)
47 : thread(th), socket(new AsyncUDPSocket(s)),
48 done(false), rate(rt), count(0) {
49 last_send = Time();
50 thread->PostDelayed(NextDelay(), this, 1);
51 }
52
53 uint32 NextDelay() {
54 uint32 size = (rand() % 4096) + 1;
55 return 1000 * size / rate;
56 }
57
58 void OnMessage(Message* pmsg) {
59 ASSERT_EQ(1u, pmsg->message_id);
60
61 if (done)
62 return;
63
64 uint32 cur_time = Time();
65 uint32 delay = cur_time - last_send;
66 uint32 size = rate * delay / 1000;
67 size = std::min<uint32>(size, 4096);
68 size = std::max<uint32>(size, sizeof(uint32));
69
70 count += size;
71 memcpy(dummy, &cur_time, sizeof(cur_time));
72 socket->Send(dummy, size);
73
74 last_send = cur_time;
75 thread->PostDelayed(NextDelay(), this, 1);
76 }
77
78 Thread* thread;
79 scoped_ptr<AsyncUDPSocket> socket;
80 bool done;
81 uint32 rate; // bytes per second
82 uint32 count;
83 uint32 last_send;
84 char dummy[4096];
85};
86
87struct Receiver : public MessageHandler, public sigslot::has_slots<> {
88 Receiver(Thread* th, AsyncSocket* s, uint32 bw)
89 : thread(th), socket(new AsyncUDPSocket(s)), bandwidth(bw), done(false),
90 count(0), sec_count(0), sum(0), sum_sq(0), samples(0) {
91 socket->SignalReadPacket.connect(this, &Receiver::OnReadPacket);
92 thread->PostDelayed(1000, this, 1);
93 }
94
95 ~Receiver() {
96 thread->Clear(this);
97 }
98
99 void OnReadPacket(AsyncPacketSocket* s, const char* data, size_t size,
100 const SocketAddress& remote_addr) {
101 ASSERT_EQ(socket.get(), s);
102 ASSERT_GE(size, 4U);
103
104 count += size;
105 sec_count += size;
106
107 uint32 send_time = *reinterpret_cast<const uint32*>(data);
108 uint32 recv_time = Time();
109 uint32 delay = recv_time - send_time;
110 sum += delay;
111 sum_sq += delay * delay;
112 samples += 1;
113 }
114
115 void OnMessage(Message* pmsg) {
116 ASSERT_EQ(1u, pmsg->message_id);
117
118 if (done)
119 return;
120
121 // It is always possible for us to receive more than expected because
122 // packets can be further delayed in delivery.
123 if (bandwidth > 0)
124 ASSERT_TRUE(sec_count <= 5 * bandwidth / 4);
125 sec_count = 0;
126 thread->PostDelayed(1000, this, 1);
127 }
128
129 Thread* thread;
130 scoped_ptr<AsyncUDPSocket> socket;
131 uint32 bandwidth;
132 bool done;
133 size_t count;
134 size_t sec_count;
135 double sum;
136 double sum_sq;
137 uint32 samples;
138};
139
140class VirtualSocketServerTest : public testing::Test {
141 public:
142 VirtualSocketServerTest() : ss_(new VirtualSocketServer(NULL)),
143 kIPv4AnyAddress(IPAddress(INADDR_ANY), 0),
144 kIPv6AnyAddress(IPAddress(in6addr_any), 0) {
145 }
146
147 void CheckAddressIncrementalization(const SocketAddress& post,
148 const SocketAddress& pre) {
149 EXPECT_EQ(post.port(), pre.port() + 1);
150 IPAddress post_ip = post.ipaddr();
151 IPAddress pre_ip = pre.ipaddr();
152 EXPECT_EQ(pre_ip.family(), post_ip.family());
153 if (post_ip.family() == AF_INET) {
154 in_addr pre_ipv4 = pre_ip.ipv4_address();
155 in_addr post_ipv4 = post_ip.ipv4_address();
156 int difference = ntohl(post_ipv4.s_addr) - ntohl(pre_ipv4.s_addr);
157 EXPECT_EQ(1, difference);
158 } else if (post_ip.family() == AF_INET6) {
159 in6_addr post_ip6 = post_ip.ipv6_address();
160 in6_addr pre_ip6 = pre_ip.ipv6_address();
161 uint32* post_as_ints = reinterpret_cast<uint32*>(&post_ip6.s6_addr);
162 uint32* pre_as_ints = reinterpret_cast<uint32*>(&pre_ip6.s6_addr);
163 EXPECT_EQ(post_as_ints[3], pre_as_ints[3] + 1);
164 }
165 }
166
167 void BasicTest(const SocketAddress& initial_addr) {
168 AsyncSocket* socket = ss_->CreateAsyncSocket(initial_addr.family(),
169 SOCK_DGRAM);
170 socket->Bind(initial_addr);
171 SocketAddress server_addr = socket->GetLocalAddress();
172 // Make sure VSS didn't switch families on us.
173 EXPECT_EQ(server_addr.family(), initial_addr.family());
174
175 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
176 AsyncSocket* socket2 =
177 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
178 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
179
180 SocketAddress client2_addr;
181 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
182 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
183
184 SocketAddress client1_addr;
185 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
186 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
187 EXPECT_EQ(client1_addr, server_addr);
188
189 SocketAddress empty = EmptySocketAddressWithFamily(initial_addr.family());
190 for (int i = 0; i < 10; i++) {
191 client2 = new TestClient(AsyncUDPSocket::Create(ss_, empty));
192
193 SocketAddress next_client2_addr;
194 EXPECT_EQ(3, client2->SendTo("foo", 3, server_addr));
195 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &next_client2_addr));
196 CheckAddressIncrementalization(next_client2_addr, client2_addr);
197 // EXPECT_EQ(next_client2_addr.port(), client2_addr.port() + 1);
198
199 SocketAddress server_addr2;
200 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, next_client2_addr));
201 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &server_addr2));
202 EXPECT_EQ(server_addr2, server_addr);
203
204 client2_addr = next_client2_addr;
205 }
206 }
207
208 // initial_addr should be made from either INADDR_ANY or in6addr_any.
209 void ConnectTest(const SocketAddress& initial_addr) {
210 testing::StreamSink sink;
211 SocketAddress accept_addr;
212 const SocketAddress kEmptyAddr =
213 EmptySocketAddressWithFamily(initial_addr.family());
214
215 // Create client
216 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
217 SOCK_STREAM);
218 sink.Monitor(client);
219 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
220 EXPECT_TRUE(client->GetLocalAddress().IsNil());
221
222 // Create server
223 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
224 SOCK_STREAM);
225 sink.Monitor(server);
226 EXPECT_NE(0, server->Listen(5)); // Bind required
227 EXPECT_EQ(0, server->Bind(initial_addr));
228 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
229 EXPECT_EQ(0, server->Listen(5));
230 EXPECT_EQ(server->GetState(), AsyncSocket::CS_CONNECTING);
231
232 // No pending server connections
233 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
234 EXPECT_TRUE(NULL == server->Accept(&accept_addr));
235 EXPECT_EQ(AF_UNSPEC, accept_addr.family());
236
237 // Attempt connect to listening socket
238 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
239 EXPECT_NE(client->GetLocalAddress(), kEmptyAddr); // Implicit Bind
240 EXPECT_NE(AF_UNSPEC, client->GetLocalAddress().family()); // Implicit Bind
241 EXPECT_NE(client->GetLocalAddress(), server->GetLocalAddress());
242
243 // Client is connecting
244 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
245 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
246 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
247
248 ss_->ProcessMessagesUntilIdle();
249
250 // Client still connecting
251 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
252 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
253 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
254
255 // Server has pending connection
256 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
257 Socket* accepted = server->Accept(&accept_addr);
258 EXPECT_TRUE(NULL != accepted);
259 EXPECT_NE(accept_addr, kEmptyAddr);
260 EXPECT_EQ(accepted->GetRemoteAddress(), accept_addr);
261
262 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
263 EXPECT_EQ(accepted->GetLocalAddress(), server->GetLocalAddress());
264 EXPECT_EQ(accepted->GetRemoteAddress(), client->GetLocalAddress());
265
266 ss_->ProcessMessagesUntilIdle();
267
268 // Client has connected
269 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTED);
270 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
271 EXPECT_FALSE(sink.Check(client, testing::SSE_CLOSE));
272 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
273 EXPECT_EQ(client->GetRemoteAddress(), accepted->GetLocalAddress());
274 }
275
276 void ConnectToNonListenerTest(const SocketAddress& initial_addr) {
277 testing::StreamSink sink;
278 SocketAddress accept_addr;
279 const SocketAddress nil_addr;
280 const SocketAddress empty_addr =
281 EmptySocketAddressWithFamily(initial_addr.family());
282
283 // Create client
284 AsyncSocket* client = ss_->CreateAsyncSocket(initial_addr.family(),
285 SOCK_STREAM);
286 sink.Monitor(client);
287
288 // Create server
289 AsyncSocket* server = ss_->CreateAsyncSocket(initial_addr.family(),
290 SOCK_STREAM);
291 sink.Monitor(server);
292 EXPECT_EQ(0, server->Bind(initial_addr));
293 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
294 // Attempt connect to non-listening socket
295 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
296
297 ss_->ProcessMessagesUntilIdle();
298
299 // No pending server connections
300 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
301 EXPECT_TRUE(NULL == server->Accept(&accept_addr));
302 EXPECT_EQ(accept_addr, nil_addr);
303
304 // Connection failed
305 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
306 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
307 EXPECT_TRUE(sink.Check(client, testing::SSE_ERROR));
308 EXPECT_EQ(client->GetRemoteAddress(), nil_addr);
309 }
310
311 void CloseDuringConnectTest(const SocketAddress& initial_addr) {
312 testing::StreamSink sink;
313 SocketAddress accept_addr;
314 const SocketAddress empty_addr =
315 EmptySocketAddressWithFamily(initial_addr.family());
316
317 // Create client and server
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000318 scoped_ptr<AsyncSocket> client(ss_->CreateAsyncSocket(initial_addr.family(),
319 SOCK_STREAM));
320 sink.Monitor(client.get());
321 scoped_ptr<AsyncSocket> server(ss_->CreateAsyncSocket(initial_addr.family(),
322 SOCK_STREAM));
323 sink.Monitor(server.get());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000324
325 // Initiate connect
326 EXPECT_EQ(0, server->Bind(initial_addr));
327 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
328
329 EXPECT_EQ(0, server->Listen(5));
330 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
331
332 // Server close before socket enters accept queue
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000333 EXPECT_FALSE(sink.Check(server.get(), testing::SSE_READ));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000334 server->Close();
335
336 ss_->ProcessMessagesUntilIdle();
337
338 // Result: connection failed
339 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000340 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000341
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000342 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
343 sink.Monitor(server.get());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000344
345 // Initiate connect
346 EXPECT_EQ(0, server->Bind(initial_addr));
347 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
348
349 EXPECT_EQ(0, server->Listen(5));
350 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
351
352 ss_->ProcessMessagesUntilIdle();
353
354 // Server close while socket is in accept queue
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000355 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000356 server->Close();
357
358 ss_->ProcessMessagesUntilIdle();
359
360 // Result: connection failed
361 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000362 EXPECT_TRUE(sink.Check(client.get(), testing::SSE_ERROR));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000363
364 // New server
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000365 server.reset(ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM));
366 sink.Monitor(server.get());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000367
368 // Initiate connect
369 EXPECT_EQ(0, server->Bind(initial_addr));
370 EXPECT_EQ(server->GetLocalAddress().family(), initial_addr.family());
371
372 EXPECT_EQ(0, server->Listen(5));
373 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
374
375 ss_->ProcessMessagesUntilIdle();
376
377 // Server accepts connection
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000378 EXPECT_TRUE(sink.Check(server.get(), testing::SSE_READ));
379 scoped_ptr<AsyncSocket> accepted(server->Accept(&accept_addr));
380 ASSERT_TRUE(NULL != accepted.get());
381 sink.Monitor(accepted.get());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000382
383 // Client closes before connection complets
384 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CONNECTED);
385
386 // Connected message has not been processed yet.
387 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CONNECTING);
388 client->Close();
389
390 ss_->ProcessMessagesUntilIdle();
391
392 // Result: accepted socket closes
393 EXPECT_EQ(accepted->GetState(), AsyncSocket::CS_CLOSED);
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000394 EXPECT_TRUE(sink.Check(accepted.get(), testing::SSE_CLOSE));
395 EXPECT_FALSE(sink.Check(client.get(), testing::SSE_CLOSE));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000396 }
397
398 void CloseTest(const SocketAddress& initial_addr) {
399 testing::StreamSink sink;
400 const SocketAddress kEmptyAddr;
401
402 // Create clients
403 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
404 sink.Monitor(a);
405 a->Bind(initial_addr);
406 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
407
408
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000409 scoped_ptr<AsyncSocket> b(ss_->CreateAsyncSocket(initial_addr.family(),
410 SOCK_STREAM));
411 sink.Monitor(b.get());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000412 b->Bind(initial_addr);
413 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
414
415 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
416 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
417
418 ss_->ProcessMessagesUntilIdle();
419
420 EXPECT_TRUE(sink.Check(a, testing::SSE_OPEN));
421 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CONNECTED);
422 EXPECT_EQ(a->GetRemoteAddress(), b->GetLocalAddress());
423
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000424 EXPECT_TRUE(sink.Check(b.get(), testing::SSE_OPEN));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000425 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CONNECTED);
426 EXPECT_EQ(b->GetRemoteAddress(), a->GetLocalAddress());
427
428 EXPECT_EQ(1, a->Send("a", 1));
429 b->Close();
430 EXPECT_EQ(1, a->Send("b", 1));
431
432 ss_->ProcessMessagesUntilIdle();
433
434 char buffer[10];
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000435 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_READ));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000436 EXPECT_EQ(-1, b->Recv(buffer, 10));
437
438 EXPECT_TRUE(sink.Check(a, testing::SSE_CLOSE));
439 EXPECT_EQ(a->GetState(), AsyncSocket::CS_CLOSED);
440 EXPECT_EQ(a->GetRemoteAddress(), kEmptyAddr);
441
wu@webrtc.org97d1a982013-08-13 00:13:26 +0000442 // No signal for Closer
443 EXPECT_FALSE(sink.Check(b.get(), testing::SSE_CLOSE));
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000444 EXPECT_EQ(b->GetState(), AsyncSocket::CS_CLOSED);
445 EXPECT_EQ(b->GetRemoteAddress(), kEmptyAddr);
446 }
447
448 void TcpSendTest(const SocketAddress& initial_addr) {
449 testing::StreamSink sink;
450 const SocketAddress kEmptyAddr;
451
452 // Connect two sockets
453 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
454 sink.Monitor(a);
455 a->Bind(initial_addr);
456 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
457
458 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(), SOCK_STREAM);
459 sink.Monitor(b);
460 b->Bind(initial_addr);
461 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
462
463 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
464 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
465
466 ss_->ProcessMessagesUntilIdle();
467
468 const size_t kBufferSize = 2000;
469 ss_->set_send_buffer_capacity(kBufferSize);
470 ss_->set_recv_buffer_capacity(kBufferSize);
471
472 const size_t kDataSize = 5000;
473 char send_buffer[kDataSize], recv_buffer[kDataSize];
474 for (size_t i = 0; i < kDataSize; ++i)
475 send_buffer[i] = static_cast<char>(i % 256);
476 memset(recv_buffer, 0, sizeof(recv_buffer));
477 size_t send_pos = 0, recv_pos = 0;
478
479 // Can't send more than send buffer in one write
480 int result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
481 EXPECT_EQ(static_cast<int>(kBufferSize), result);
482 send_pos += result;
483
484 ss_->ProcessMessagesUntilIdle();
485 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
486 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
487
488 // Receive buffer is already filled, fill send buffer again
489 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
490 EXPECT_EQ(static_cast<int>(kBufferSize), result);
491 send_pos += result;
492
493 ss_->ProcessMessagesUntilIdle();
494 EXPECT_FALSE(sink.Check(a, testing::SSE_WRITE));
495 EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
496
497 // No more room in send or receive buffer
498 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
499 EXPECT_EQ(-1, result);
500 EXPECT_TRUE(a->IsBlocking());
501
502 // Read a subset of the data
503 result = b->Recv(recv_buffer + recv_pos, 500);
504 EXPECT_EQ(500, result);
505 recv_pos += result;
506
507 ss_->ProcessMessagesUntilIdle();
508 EXPECT_TRUE(sink.Check(a, testing::SSE_WRITE));
509 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
510
511 // Room for more on the sending side
512 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
513 EXPECT_EQ(500, result);
514 send_pos += result;
515
516 // Empty the recv buffer
517 while (true) {
518 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
519 if (result < 0) {
520 EXPECT_EQ(-1, result);
521 EXPECT_TRUE(b->IsBlocking());
522 break;
523 }
524 recv_pos += result;
525 }
526
527 ss_->ProcessMessagesUntilIdle();
528 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
529
530 // Continue to empty the recv buffer
531 while (true) {
532 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
533 if (result < 0) {
534 EXPECT_EQ(-1, result);
535 EXPECT_TRUE(b->IsBlocking());
536 break;
537 }
538 recv_pos += result;
539 }
540
541 // Send last of the data
542 result = a->Send(send_buffer + send_pos, kDataSize - send_pos);
543 EXPECT_EQ(500, result);
544 send_pos += result;
545
546 ss_->ProcessMessagesUntilIdle();
547 EXPECT_TRUE(sink.Check(b, testing::SSE_READ));
548
549 // Receive the last of the data
550 while (true) {
551 result = b->Recv(recv_buffer + recv_pos, kDataSize - recv_pos);
552 if (result < 0) {
553 EXPECT_EQ(-1, result);
554 EXPECT_TRUE(b->IsBlocking());
555 break;
556 }
557 recv_pos += result;
558 }
559
560 ss_->ProcessMessagesUntilIdle();
561 EXPECT_FALSE(sink.Check(b, testing::SSE_READ));
562
563 // The received data matches the sent data
564 EXPECT_EQ(kDataSize, send_pos);
565 EXPECT_EQ(kDataSize, recv_pos);
566 EXPECT_EQ(0, memcmp(recv_buffer, send_buffer, kDataSize));
567 }
568
569 void TcpSendsPacketsInOrderTest(const SocketAddress& initial_addr) {
570 const SocketAddress kEmptyAddr;
571
572 // Connect two sockets
573 AsyncSocket* a = ss_->CreateAsyncSocket(initial_addr.family(),
574 SOCK_STREAM);
575 AsyncSocket* b = ss_->CreateAsyncSocket(initial_addr.family(),
576 SOCK_STREAM);
577 a->Bind(initial_addr);
578 EXPECT_EQ(a->GetLocalAddress().family(), initial_addr.family());
579
580 b->Bind(initial_addr);
581 EXPECT_EQ(b->GetLocalAddress().family(), initial_addr.family());
582
583 EXPECT_EQ(0, a->Connect(b->GetLocalAddress()));
584 EXPECT_EQ(0, b->Connect(a->GetLocalAddress()));
585 ss_->ProcessMessagesUntilIdle();
586
587 // First, deliver all packets in 0 ms.
588 char buffer[2] = { 0, 0 };
589 const char cNumPackets = 10;
590 for (char i = 0; i < cNumPackets; ++i) {
591 buffer[0] = '0' + i;
592 EXPECT_EQ(1, a->Send(buffer, 1));
593 }
594
595 ss_->ProcessMessagesUntilIdle();
596
597 for (char i = 0; i < cNumPackets; ++i) {
598 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
599 EXPECT_EQ(static_cast<char>('0' + i), buffer[0]);
600 }
601
602 // Next, deliver packets at random intervals
603 const uint32 mean = 50;
604 const uint32 stddev = 50;
605
606 ss_->set_delay_mean(mean);
607 ss_->set_delay_stddev(stddev);
608 ss_->UpdateDelayDistribution();
609
610 for (char i = 0; i < cNumPackets; ++i) {
611 buffer[0] = 'A' + i;
612 EXPECT_EQ(1, a->Send(buffer, 1));
613 }
614
615 ss_->ProcessMessagesUntilIdle();
616
617 for (char i = 0; i < cNumPackets; ++i) {
618 EXPECT_EQ(1, b->Recv(buffer, sizeof(buffer)));
619 EXPECT_EQ(static_cast<char>('A' + i), buffer[0]);
620 }
621 }
622
623 void BandwidthTest(const SocketAddress& initial_addr) {
624 AsyncSocket* send_socket =
625 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
626 AsyncSocket* recv_socket =
627 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
628 ASSERT_EQ(0, send_socket->Bind(initial_addr));
629 ASSERT_EQ(0, recv_socket->Bind(initial_addr));
630 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
631 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
632 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
633
634 uint32 bandwidth = 64 * 1024;
635 ss_->set_bandwidth(bandwidth);
636
637 Thread* pthMain = Thread::Current();
638 Sender sender(pthMain, send_socket, 80 * 1024);
639 Receiver receiver(pthMain, recv_socket, bandwidth);
640
641 pthMain->ProcessMessages(5000);
642 sender.done = true;
643 pthMain->ProcessMessages(5000);
644
645 ASSERT_TRUE(receiver.count >= 5 * 3 * bandwidth / 4);
646 ASSERT_TRUE(receiver.count <= 6 * bandwidth); // queue could drain for 1s
647
648 ss_->set_bandwidth(0);
649 }
650
651 void DelayTest(const SocketAddress& initial_addr) {
652 time_t seed = ::time(NULL);
653 LOG(LS_VERBOSE) << "seed = " << seed;
654 srand(static_cast<unsigned int>(seed));
655
656 const uint32 mean = 2000;
657 const uint32 stddev = 500;
658
659 ss_->set_delay_mean(mean);
660 ss_->set_delay_stddev(stddev);
661 ss_->UpdateDelayDistribution();
662
663 AsyncSocket* send_socket =
664 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
665 AsyncSocket* recv_socket =
666 ss_->CreateAsyncSocket(initial_addr.family(), SOCK_DGRAM);
667 ASSERT_EQ(0, send_socket->Bind(initial_addr));
668 ASSERT_EQ(0, recv_socket->Bind(initial_addr));
669 EXPECT_EQ(send_socket->GetLocalAddress().family(), initial_addr.family());
670 EXPECT_EQ(recv_socket->GetLocalAddress().family(), initial_addr.family());
671 ASSERT_EQ(0, send_socket->Connect(recv_socket->GetLocalAddress()));
672
673 Thread* pthMain = Thread::Current();
674 // Avg packet size is 2K, so at 200KB/s for 10s, we should see about
675 // 1000 packets, which is necessary to get a good distribution.
676 Sender sender(pthMain, send_socket, 100 * 2 * 1024);
677 Receiver receiver(pthMain, recv_socket, 0);
678
679 pthMain->ProcessMessages(10000);
680 sender.done = receiver.done = true;
681 ss_->ProcessMessagesUntilIdle();
682
683 const double sample_mean = receiver.sum / receiver.samples;
684 double num =
685 receiver.samples * receiver.sum_sq - receiver.sum * receiver.sum;
686 double den = receiver.samples * (receiver.samples - 1);
687 const double sample_stddev = std::sqrt(num / den);
688 LOG(LS_VERBOSE) << "mean=" << sample_mean << " stddev=" << sample_stddev;
689
690 EXPECT_LE(500u, receiver.samples);
691 // We initially used a 0.1 fudge factor, but on the build machine, we
692 // have seen the value differ by as much as 0.13.
693 EXPECT_NEAR(mean, sample_mean, 0.15 * mean);
694 EXPECT_NEAR(stddev, sample_stddev, 0.15 * stddev);
695
696 ss_->set_delay_mean(0);
697 ss_->set_delay_stddev(0);
698 ss_->UpdateDelayDistribution();
699 }
700
701 // Test cross-family communication between a client bound to client_addr and a
702 // server bound to server_addr. shouldSucceed indicates if communication is
703 // expected to work or not.
704 void CrossFamilyConnectionTest(const SocketAddress& client_addr,
705 const SocketAddress& server_addr,
706 bool shouldSucceed) {
707 testing::StreamSink sink;
708 SocketAddress accept_address;
709 const SocketAddress kEmptyAddr;
710
711 // Client gets a IPv4 address
712 AsyncSocket* client = ss_->CreateAsyncSocket(client_addr.family(),
713 SOCK_STREAM);
714 sink.Monitor(client);
715 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
716 EXPECT_EQ(client->GetLocalAddress(), kEmptyAddr);
717 client->Bind(client_addr);
718
719 // Server gets a non-mapped non-any IPv6 address.
720 // IPv4 sockets should not be able to connect to this.
721 AsyncSocket* server = ss_->CreateAsyncSocket(server_addr.family(),
722 SOCK_STREAM);
723 sink.Monitor(server);
724 server->Bind(server_addr);
725 server->Listen(5);
726
727 if (shouldSucceed) {
728 EXPECT_EQ(0, client->Connect(server->GetLocalAddress()));
729 ss_->ProcessMessagesUntilIdle();
730 EXPECT_TRUE(sink.Check(server, testing::SSE_READ));
731 Socket* accepted = server->Accept(&accept_address);
732 EXPECT_TRUE(NULL != accepted);
733 EXPECT_NE(kEmptyAddr, accept_address);
734 ss_->ProcessMessagesUntilIdle();
735 EXPECT_TRUE(sink.Check(client, testing::SSE_OPEN));
736 EXPECT_EQ(client->GetRemoteAddress(), server->GetLocalAddress());
737 } else {
738 // Check that the connection failed.
739 EXPECT_EQ(-1, client->Connect(server->GetLocalAddress()));
740 ss_->ProcessMessagesUntilIdle();
741
742 EXPECT_FALSE(sink.Check(server, testing::SSE_READ));
743 EXPECT_TRUE(NULL == server->Accept(&accept_address));
744 EXPECT_EQ(accept_address, kEmptyAddr);
745 EXPECT_EQ(client->GetState(), AsyncSocket::CS_CLOSED);
746 EXPECT_FALSE(sink.Check(client, testing::SSE_OPEN));
747 EXPECT_EQ(client->GetRemoteAddress(), kEmptyAddr);
748 }
749 }
750
751 // Test cross-family datagram sending between a client bound to client_addr
752 // and a server bound to server_addr. shouldSucceed indicates if sending is
753 // expected to succed or not.
754 void CrossFamilyDatagramTest(const SocketAddress& client_addr,
755 const SocketAddress& server_addr,
756 bool shouldSucceed) {
757 AsyncSocket* socket = ss_->CreateAsyncSocket(SOCK_DGRAM);
758 socket->Bind(server_addr);
759 SocketAddress bound_server_addr = socket->GetLocalAddress();
760 TestClient* client1 = new TestClient(new AsyncUDPSocket(socket));
761
762 AsyncSocket* socket2 = ss_->CreateAsyncSocket(SOCK_DGRAM);
763 socket2->Bind(client_addr);
764 TestClient* client2 = new TestClient(new AsyncUDPSocket(socket2));
765 SocketAddress client2_addr;
766
767 if (shouldSucceed) {
768 EXPECT_EQ(3, client2->SendTo("foo", 3, bound_server_addr));
769 EXPECT_TRUE(client1->CheckNextPacket("foo", 3, &client2_addr));
770 SocketAddress client1_addr;
771 EXPECT_EQ(6, client1->SendTo("bizbaz", 6, client2_addr));
772 EXPECT_TRUE(client2->CheckNextPacket("bizbaz", 6, &client1_addr));
773 EXPECT_EQ(client1_addr, bound_server_addr);
774 } else {
775 EXPECT_EQ(-1, client2->SendTo("foo", 3, bound_server_addr));
776 EXPECT_FALSE(client1->CheckNextPacket("foo", 3, 0));
777 }
778 }
779
780 protected:
781 virtual void SetUp() {
782 Thread::Current()->set_socketserver(ss_);
783 }
784 virtual void TearDown() {
785 Thread::Current()->set_socketserver(NULL);
786 }
787
788 VirtualSocketServer* ss_;
789 const SocketAddress kIPv4AnyAddress;
790 const SocketAddress kIPv6AnyAddress;
791};
792
793TEST_F(VirtualSocketServerTest, basic_v4) {
794 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 5000);
795 BasicTest(ipv4_test_addr);
796}
797
798TEST_F(VirtualSocketServerTest, basic_v6) {
799 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 5000);
800 BasicTest(ipv6_test_addr);
801}
802
803TEST_F(VirtualSocketServerTest, connect_v4) {
804 ConnectTest(kIPv4AnyAddress);
805}
806
807TEST_F(VirtualSocketServerTest, connect_v6) {
808 ConnectTest(kIPv6AnyAddress);
809}
810
811TEST_F(VirtualSocketServerTest, connect_to_non_listener_v4) {
812 ConnectToNonListenerTest(kIPv4AnyAddress);
813}
814
815TEST_F(VirtualSocketServerTest, connect_to_non_listener_v6) {
816 ConnectToNonListenerTest(kIPv6AnyAddress);
817}
818
819TEST_F(VirtualSocketServerTest, close_during_connect_v4) {
820 CloseDuringConnectTest(kIPv4AnyAddress);
821}
822
823TEST_F(VirtualSocketServerTest, close_during_connect_v6) {
824 CloseDuringConnectTest(kIPv6AnyAddress);
825}
826
827TEST_F(VirtualSocketServerTest, close_v4) {
828 CloseTest(kIPv4AnyAddress);
829}
830
831TEST_F(VirtualSocketServerTest, close_v6) {
832 CloseTest(kIPv6AnyAddress);
833}
834
835TEST_F(VirtualSocketServerTest, tcp_send_v4) {
836 TcpSendTest(kIPv4AnyAddress);
837}
838
839TEST_F(VirtualSocketServerTest, tcp_send_v6) {
840 TcpSendTest(kIPv6AnyAddress);
841}
842
843TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v4) {
844 TcpSendsPacketsInOrderTest(kIPv4AnyAddress);
845}
846
847TEST_F(VirtualSocketServerTest, TcpSendsPacketsInOrder_v6) {
848 TcpSendsPacketsInOrderTest(kIPv6AnyAddress);
849}
850
851TEST_F(VirtualSocketServerTest, bandwidth_v4) {
852 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
853 BandwidthTest(ipv4_test_addr);
854}
855
856TEST_F(VirtualSocketServerTest, bandwidth_v6) {
857 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
858 BandwidthTest(ipv6_test_addr);
859}
860
861TEST_F(VirtualSocketServerTest, delay_v4) {
862 SocketAddress ipv4_test_addr(IPAddress(INADDR_ANY), 1000);
863 DelayTest(ipv4_test_addr);
864}
865
866TEST_F(VirtualSocketServerTest, delay_v6) {
867 SocketAddress ipv6_test_addr(IPAddress(in6addr_any), 1000);
868 DelayTest(ipv6_test_addr);
869}
870
871// Works, receiving socket sees 127.0.0.2.
872TEST_F(VirtualSocketServerTest, CanConnectFromMappedIPv6ToIPv4Any) {
873 CrossFamilyConnectionTest(SocketAddress("::ffff:127.0.0.2", 0),
874 SocketAddress("0.0.0.0", 5000),
875 true);
876}
877
878// Fails.
879TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToIPv4Any) {
880 CrossFamilyConnectionTest(SocketAddress("::2", 0),
881 SocketAddress("0.0.0.0", 5000),
882 false);
883}
884
885// Fails.
886TEST_F(VirtualSocketServerTest, CantConnectFromUnMappedIPv6ToMappedIPv6) {
887 CrossFamilyConnectionTest(SocketAddress("::2", 0),
888 SocketAddress("::ffff:127.0.0.1", 5000),
889 false);
890}
891
892// Works. receiving socket sees ::ffff:127.0.0.2.
893TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToIPv6Any) {
894 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
895 SocketAddress("::", 5000),
896 true);
897}
898
899// Fails.
900TEST_F(VirtualSocketServerTest, CantConnectFromIPv4ToUnMappedIPv6) {
901 CrossFamilyConnectionTest(SocketAddress("127.0.0.2", 0),
902 SocketAddress("::1", 5000),
903 false);
904}
905
906// Works. Receiving socket sees ::ffff:127.0.0.1.
907TEST_F(VirtualSocketServerTest, CanConnectFromIPv4ToMappedIPv6) {
908 CrossFamilyConnectionTest(SocketAddress("127.0.0.1", 0),
909 SocketAddress("::ffff:127.0.0.2", 5000),
910 true);
911}
912
913// Works, receiving socket sees a result from GetNextIP.
914TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv6ToIPv4Any) {
915 CrossFamilyConnectionTest(SocketAddress("::", 0),
916 SocketAddress("0.0.0.0", 5000),
917 true);
918}
919
920// Works, receiving socket sees whatever GetNextIP gave the client.
921TEST_F(VirtualSocketServerTest, CanConnectFromUnboundIPv4ToIPv6Any) {
922 CrossFamilyConnectionTest(SocketAddress("0.0.0.0", 0),
923 SocketAddress("::", 5000),
924 true);
925}
926
927TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv4ToIPv6Any) {
928 CrossFamilyDatagramTest(SocketAddress("0.0.0.0", 0),
929 SocketAddress("::", 5000),
930 true);
931}
932
933TEST_F(VirtualSocketServerTest, CanSendDatagramFromMappedIPv6ToIPv4Any) {
934 CrossFamilyDatagramTest(SocketAddress("::ffff:127.0.0.1", 0),
935 SocketAddress("0.0.0.0", 5000),
936 true);
937}
938
939TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToIPv4Any) {
940 CrossFamilyDatagramTest(SocketAddress("::2", 0),
941 SocketAddress("0.0.0.0", 5000),
942 false);
943}
944
945TEST_F(VirtualSocketServerTest, CantSendDatagramFromUnMappedIPv6ToMappedIPv6) {
946 CrossFamilyDatagramTest(SocketAddress("::2", 0),
947 SocketAddress("::ffff:127.0.0.1", 5000),
948 false);
949}
950
951TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToIPv6Any) {
952 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
953 SocketAddress("::", 5000),
954 true);
955}
956
957TEST_F(VirtualSocketServerTest, CantSendDatagramFromIPv4ToUnMappedIPv6) {
958 CrossFamilyDatagramTest(SocketAddress("127.0.0.2", 0),
959 SocketAddress("::1", 5000),
960 false);
961}
962
963TEST_F(VirtualSocketServerTest, CanSendDatagramFromIPv4ToMappedIPv6) {
964 CrossFamilyDatagramTest(SocketAddress("127.0.0.1", 0),
965 SocketAddress("::ffff:127.0.0.2", 5000),
966 true);
967}
968
969TEST_F(VirtualSocketServerTest, CanSendDatagramFromUnboundIPv6ToIPv4Any) {
970 CrossFamilyDatagramTest(SocketAddress("::", 0),
971 SocketAddress("0.0.0.0", 5000),
972 true);
973}
974
975TEST_F(VirtualSocketServerTest, CreatesStandardDistribution) {
976 const uint32 kTestMean[] = { 10, 100, 333, 1000 };
977 const double kTestDev[] = { 0.25, 0.1, 0.01 };
978 // TODO: The current code only works for 1000 data points or more.
979 const uint32 kTestSamples[] = { /*10, 100,*/ 1000 };
980 for (size_t midx = 0; midx < ARRAY_SIZE(kTestMean); ++midx) {
981 for (size_t didx = 0; didx < ARRAY_SIZE(kTestDev); ++didx) {
982 for (size_t sidx = 0; sidx < ARRAY_SIZE(kTestSamples); ++sidx) {
983 ASSERT_LT(0u, kTestSamples[sidx]);
984 const uint32 kStdDev =
985 static_cast<uint32>(kTestDev[didx] * kTestMean[midx]);
986 VirtualSocketServer::Function* f =
987 VirtualSocketServer::CreateDistribution(kTestMean[midx],
988 kStdDev,
989 kTestSamples[sidx]);
990 ASSERT_TRUE(NULL != f);
991 ASSERT_EQ(kTestSamples[sidx], f->size());
992 double sum = 0;
993 for (uint32 i = 0; i < f->size(); ++i) {
994 sum += (*f)[i].second;
995 }
996 const double mean = sum / f->size();
997 double sum_sq_dev = 0;
998 for (uint32 i = 0; i < f->size(); ++i) {
999 double dev = (*f)[i].second - mean;
1000 sum_sq_dev += dev * dev;
1001 }
1002 const double stddev = std::sqrt(sum_sq_dev / f->size());
1003 EXPECT_NEAR(kTestMean[midx], mean, 0.1 * kTestMean[midx])
1004 << "M=" << kTestMean[midx]
1005 << " SD=" << kStdDev
1006 << " N=" << kTestSamples[sidx];
1007 EXPECT_NEAR(kStdDev, stddev, 0.1 * kStdDev)
1008 << "M=" << kTestMean[midx]
1009 << " SD=" << kStdDev
1010 << " N=" << kTestSamples[sidx];
1011 delete f;
1012 }
1013 }
1014 }
1015}