blob: 43be440e7bd6819209be1bfb37e2cd62267d774c [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#if defined(_MSC_VER) && _MSC_VER < 1300
29#pragma warning(disable:4786)
30#endif
31
32#include <cassert>
33
34#ifdef POSIX
35#include <string.h>
36#include <errno.h>
37#include <fcntl.h>
38#include <sys/time.h>
39#include <unistd.h>
40#include <signal.h>
41#endif
42
43#ifdef WIN32
44#define WIN32_LEAN_AND_MEAN
45#include <windows.h>
46#include <winsock2.h>
47#include <ws2tcpip.h>
48#undef SetPort
49#endif
50
51#include <algorithm>
52#include <map>
53
54#include "talk/base/basictypes.h"
55#include "talk/base/byteorder.h"
56#include "talk/base/common.h"
57#include "talk/base/logging.h"
58#include "talk/base/nethelpers.h"
59#include "talk/base/physicalsocketserver.h"
60#include "talk/base/timeutils.h"
61#include "talk/base/winping.h"
62#include "talk/base/win32socketinit.h"
63
64// stm: this will tell us if we are on OSX
65#ifdef HAVE_CONFIG_H
66#include "config.h"
67#endif
68
69#ifdef POSIX
70#include <netinet/tcp.h> // for TCP_NODELAY
71#define IP_MTU 14 // Until this is integrated from linux/in.h to netinet/in.h
72typedef void* SockOptArg;
73#endif // POSIX
74
75#ifdef WIN32
76typedef char* SockOptArg;
77#endif
78
79namespace talk_base {
80
81// Standard MTUs, from RFC 1191
82const uint16 PACKET_MAXIMUMS[] = {
83 65535, // Theoretical maximum, Hyperchannel
84 32000, // Nothing
85 17914, // 16Mb IBM Token Ring
86 8166, // IEEE 802.4
87 //4464, // IEEE 802.5 (4Mb max)
88 4352, // FDDI
89 //2048, // Wideband Network
90 2002, // IEEE 802.5 (4Mb recommended)
91 //1536, // Expermental Ethernet Networks
92 //1500, // Ethernet, Point-to-Point (default)
93 1492, // IEEE 802.3
94 1006, // SLIP, ARPANET
95 //576, // X.25 Networks
96 //544, // DEC IP Portal
97 //512, // NETBIOS
98 508, // IEEE 802/Source-Rt Bridge, ARCNET
99 296, // Point-to-Point (low delay)
100 68, // Official minimum
101 0, // End of list marker
102};
103
104static const int IP_HEADER_SIZE = 20u;
105static const int IPV6_HEADER_SIZE = 40u;
106static const int ICMP_HEADER_SIZE = 8u;
107static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
108
109class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
110 public:
111 PhysicalSocket(PhysicalSocketServer* ss, SOCKET s = INVALID_SOCKET)
112 : ss_(ss), s_(s), enabled_events_(0), error_(0),
113 state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
114 resolver_(NULL) {
115#ifdef WIN32
116 // EnsureWinsockInit() ensures that winsock is initialized. The default
117 // version of this function doesn't do anything because winsock is
118 // initialized by constructor of a static object. If neccessary libjingle
119 // users can link it with a different version of this function by replacing
120 // win32socketinit.cc. See win32socketinit.cc for more details.
121 EnsureWinsockInit();
122#endif
123 if (s_ != INVALID_SOCKET) {
124 enabled_events_ = DE_READ | DE_WRITE;
125
126 int type = SOCK_STREAM;
127 socklen_t len = sizeof(type);
128 VERIFY(0 == getsockopt(s_, SOL_SOCKET, SO_TYPE, (SockOptArg)&type, &len));
129 udp_ = (SOCK_DGRAM == type);
130 }
131 }
132
133 virtual ~PhysicalSocket() {
134 Close();
135 }
136
137 // Creates the underlying OS socket (same as the "socket" function).
138 virtual bool Create(int family, int type) {
139 Close();
140 s_ = ::socket(family, type, 0);
141 udp_ = (SOCK_DGRAM == type);
142 UpdateLastError();
143 if (udp_)
144 enabled_events_ = DE_READ | DE_WRITE;
145 return s_ != INVALID_SOCKET;
146 }
147
148 SocketAddress GetLocalAddress() const {
149 sockaddr_storage addr_storage = {0};
150 socklen_t addrlen = sizeof(addr_storage);
151 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
152 int result = ::getsockname(s_, addr, &addrlen);
153 SocketAddress address;
154 if (result >= 0) {
155 SocketAddressFromSockAddrStorage(addr_storage, &address);
156 } else {
157 LOG(LS_WARNING) << "GetLocalAddress: unable to get local addr, socket="
158 << s_;
159 }
160 return address;
161 }
162
163 SocketAddress GetRemoteAddress() const {
164 sockaddr_storage addr_storage = {0};
165 socklen_t addrlen = sizeof(addr_storage);
166 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
167 int result = ::getpeername(s_, addr, &addrlen);
168 SocketAddress address;
169 if (result >= 0) {
170 SocketAddressFromSockAddrStorage(addr_storage, &address);
171 } else {
172 LOG(LS_WARNING) << "GetRemoteAddress: unable to get remote addr, socket="
173 << s_;
174 }
175 return address;
176 }
177
178 int Bind(const SocketAddress& bind_addr) {
179 sockaddr_storage addr_storage;
180 size_t len = bind_addr.ToSockAddrStorage(&addr_storage);
181 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
182 int err = ::bind(s_, addr, static_cast<int>(len));
183 UpdateLastError();
184#ifdef _DEBUG
185 if (0 == err) {
186 dbg_addr_ = "Bound @ ";
187 dbg_addr_.append(GetLocalAddress().ToString());
188 }
189#endif // _DEBUG
190 return err;
191 }
192
193 int Connect(const SocketAddress& addr) {
194 // TODO: Implicit creation is required to reconnect...
195 // ...but should we make it more explicit?
196 if (state_ != CS_CLOSED) {
197 SetError(EALREADY);
198 return SOCKET_ERROR;
199 }
200 if (addr.IsUnresolved()) {
201 LOG(LS_VERBOSE) << "Resolving addr in PhysicalSocket::Connect";
202 resolver_ = new AsyncResolver();
sergeyu@chromium.orga23f0ca2013-11-13 22:48:52 +0000203 resolver_->SignalDone.connect(this, &PhysicalSocket::OnResolveResult);
204 resolver_->Start(addr);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000205 state_ = CS_CONNECTING;
206 return 0;
207 }
208
209 return DoConnect(addr);
210 }
211
212 int DoConnect(const SocketAddress& connect_addr) {
213 if ((s_ == INVALID_SOCKET) &&
214 !Create(connect_addr.family(), SOCK_STREAM)) {
215 return SOCKET_ERROR;
216 }
217 sockaddr_storage addr_storage;
218 size_t len = connect_addr.ToSockAddrStorage(&addr_storage);
219 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
220 int err = ::connect(s_, addr, static_cast<int>(len));
221 UpdateLastError();
222 if (err == 0) {
223 state_ = CS_CONNECTED;
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000224 } else if (IsBlockingError(GetError())) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000225 state_ = CS_CONNECTING;
226 enabled_events_ |= DE_CONNECT;
227 } else {
228 return SOCKET_ERROR;
229 }
230
231 enabled_events_ |= DE_READ | DE_WRITE;
232 return 0;
233 }
234
235 int GetError() const {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000236 CritScope cs(&crit_);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000237 return error_;
238 }
239
240 void SetError(int error) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000241 CritScope cs(&crit_);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000242 error_ = error;
243 }
244
245 ConnState GetState() const {
246 return state_;
247 }
248
249 int GetOption(Option opt, int* value) {
250 int slevel;
251 int sopt;
252 if (TranslateOption(opt, &slevel, &sopt) == -1)
253 return -1;
254 socklen_t optlen = sizeof(*value);
255 int ret = ::getsockopt(s_, slevel, sopt, (SockOptArg)value, &optlen);
256 if (ret != -1 && opt == OPT_DONTFRAGMENT) {
257#ifdef LINUX
258 *value = (*value != IP_PMTUDISC_DONT) ? 1 : 0;
259#endif
260 }
261 return ret;
262 }
263
264 int SetOption(Option opt, int value) {
265 int slevel;
266 int sopt;
267 if (TranslateOption(opt, &slevel, &sopt) == -1)
268 return -1;
269 if (opt == OPT_DONTFRAGMENT) {
270#ifdef LINUX
271 value = (value) ? IP_PMTUDISC_DO : IP_PMTUDISC_DONT;
272#endif
273 }
274 return ::setsockopt(s_, slevel, sopt, (SockOptArg)&value, sizeof(value));
275 }
276
277 int Send(const void *pv, size_t cb) {
278 int sent = ::send(s_, reinterpret_cast<const char *>(pv), (int)cb,
279#ifdef LINUX
280 // Suppress SIGPIPE. Without this, attempting to send on a socket whose
281 // other end is closed will result in a SIGPIPE signal being raised to
282 // our process, which by default will terminate the process, which we
283 // don't want. By specifying this flag, we'll just get the error EPIPE
284 // instead and can handle the error gracefully.
285 MSG_NOSIGNAL
286#else
287 0
288#endif
289 );
290 UpdateLastError();
291 MaybeRemapSendError();
292 // We have seen minidumps where this may be false.
293 ASSERT(sent <= static_cast<int>(cb));
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000294 if ((sent < 0) && IsBlockingError(GetError())) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000295 enabled_events_ |= DE_WRITE;
296 }
297 return sent;
298 }
299
300 int SendTo(const void* buffer, size_t length, const SocketAddress& addr) {
301 sockaddr_storage saddr;
302 size_t len = addr.ToSockAddrStorage(&saddr);
303 int sent = ::sendto(
304 s_, static_cast<const char *>(buffer), static_cast<int>(length),
305#ifdef LINUX
306 // Suppress SIGPIPE. See above for explanation.
307 MSG_NOSIGNAL,
308#else
309 0,
310#endif
311 reinterpret_cast<sockaddr*>(&saddr), static_cast<int>(len));
312 UpdateLastError();
313 MaybeRemapSendError();
314 // We have seen minidumps where this may be false.
315 ASSERT(sent <= static_cast<int>(length));
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000316 if ((sent < 0) && IsBlockingError(GetError())) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000317 enabled_events_ |= DE_WRITE;
318 }
319 return sent;
320 }
321
322 int Recv(void* buffer, size_t length) {
323 int received = ::recv(s_, static_cast<char*>(buffer),
324 static_cast<int>(length), 0);
325 if ((received == 0) && (length != 0)) {
326 // Note: on graceful shutdown, recv can return 0. In this case, we
327 // pretend it is blocking, and then signal close, so that simplifying
328 // assumptions can be made about Recv.
329 LOG(LS_WARNING) << "EOF from socket; deferring close event";
330 // Must turn this back on so that the select() loop will notice the close
331 // event.
332 enabled_events_ |= DE_READ;
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000333 SetError(EWOULDBLOCK);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000334 return SOCKET_ERROR;
335 }
336 UpdateLastError();
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000337 int error = GetError();
338 bool success = (received >= 0) || IsBlockingError(error);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000339 if (udp_ || success) {
340 enabled_events_ |= DE_READ;
341 }
342 if (!success) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000343 LOG_F(LS_VERBOSE) << "Error = " << error;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000344 }
345 return received;
346 }
347
348 int RecvFrom(void* buffer, size_t length, SocketAddress *out_addr) {
349 sockaddr_storage addr_storage;
350 socklen_t addr_len = sizeof(addr_storage);
351 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
352 int received = ::recvfrom(s_, static_cast<char*>(buffer),
353 static_cast<int>(length), 0, addr, &addr_len);
354 UpdateLastError();
355 if ((received >= 0) && (out_addr != NULL))
356 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000357 int error = GetError();
358 bool success = (received >= 0) || IsBlockingError(error);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000359 if (udp_ || success) {
360 enabled_events_ |= DE_READ;
361 }
362 if (!success) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000363 LOG_F(LS_VERBOSE) << "Error = " << error;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000364 }
365 return received;
366 }
367
368 int Listen(int backlog) {
369 int err = ::listen(s_, backlog);
370 UpdateLastError();
371 if (err == 0) {
372 state_ = CS_CONNECTING;
373 enabled_events_ |= DE_ACCEPT;
374#ifdef _DEBUG
375 dbg_addr_ = "Listening @ ";
376 dbg_addr_.append(GetLocalAddress().ToString());
377#endif // _DEBUG
378 }
379 return err;
380 }
381
382 AsyncSocket* Accept(SocketAddress *out_addr) {
383 sockaddr_storage addr_storage;
384 socklen_t addr_len = sizeof(addr_storage);
385 sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
386 SOCKET s = ::accept(s_, addr, &addr_len);
387 UpdateLastError();
388 if (s == INVALID_SOCKET)
389 return NULL;
390 enabled_events_ |= DE_ACCEPT;
391 if (out_addr != NULL)
392 SocketAddressFromSockAddrStorage(addr_storage, out_addr);
393 return ss_->WrapSocket(s);
394 }
395
396 int Close() {
397 if (s_ == INVALID_SOCKET)
398 return 0;
399 int err = ::closesocket(s_);
400 UpdateLastError();
401 s_ = INVALID_SOCKET;
402 state_ = CS_CLOSED;
403 enabled_events_ = 0;
404 if (resolver_) {
405 resolver_->Destroy(false);
406 resolver_ = NULL;
407 }
408 return err;
409 }
410
411 int EstimateMTU(uint16* mtu) {
412 SocketAddress addr = GetRemoteAddress();
413 if (addr.IsAny()) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000414 SetError(ENOTCONN);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000415 return -1;
416 }
417
418#if defined(WIN32)
419 // Gets the interface MTU (TTL=1) for the interface used to reach |addr|.
420 WinPing ping;
421 if (!ping.IsValid()) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000422 SetError(EINVAL); // can't think of a better error ID
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000423 return -1;
424 }
425 int header_size = ICMP_HEADER_SIZE;
426 if (addr.family() == AF_INET6) {
427 header_size += IPV6_HEADER_SIZE;
428 } else if (addr.family() == AF_INET) {
429 header_size += IP_HEADER_SIZE;
430 }
431
432 for (int level = 0; PACKET_MAXIMUMS[level + 1] > 0; ++level) {
433 int32 size = PACKET_MAXIMUMS[level] - header_size;
434 WinPing::PingResult result = ping.Ping(addr.ipaddr(), size,
435 ICMP_PING_TIMEOUT_MILLIS,
436 1, false);
437 if (result == WinPing::PING_FAIL) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000438 SetError(EINVAL); // can't think of a better error ID
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000439 return -1;
440 } else if (result != WinPing::PING_TOO_LARGE) {
441 *mtu = PACKET_MAXIMUMS[level];
442 return 0;
443 }
444 }
445
446 ASSERT(false);
447 return -1;
448#elif defined(IOS) || defined(OSX)
449 // No simple way to do this on Mac OS X.
450 // SIOCGIFMTU would work if we knew which interface would be used, but
451 // figuring that out is pretty complicated. For now we'll return an error
452 // and let the caller pick a default MTU.
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000453 SetError(EINVAL);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000454 return -1;
455#elif defined(LINUX) || defined(ANDROID)
456 // Gets the path MTU.
457 int value;
458 socklen_t vlen = sizeof(value);
459 int err = getsockopt(s_, IPPROTO_IP, IP_MTU, &value, &vlen);
460 if (err < 0) {
461 UpdateLastError();
462 return err;
463 }
464
465 ASSERT((0 <= value) && (value <= 65536));
466 *mtu = value;
467 return 0;
wu@webrtc.orgcecfd182013-10-30 05:18:12 +0000468#elif defined(__native_client__)
469 // Most socket operations, including this, will fail in NaCl's sandbox.
470 error_ = EACCES;
471 return -1;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000472#endif
473 }
474
475 SocketServer* socketserver() { return ss_; }
476
477 protected:
sergeyu@chromium.orga23f0ca2013-11-13 22:48:52 +0000478 void OnResolveResult(AsyncResolverInterface* resolver) {
479 if (resolver != resolver_) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000480 return;
481 }
482
sergeyu@chromium.orga23f0ca2013-11-13 22:48:52 +0000483 int error = resolver_->GetError();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000484 if (error == 0) {
485 error = DoConnect(resolver_->address());
486 } else {
487 Close();
488 }
489
490 if (error) {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000491 SetError(error);
492 SignalCloseEvent(this, error);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000493 }
494 }
495
496 void UpdateLastError() {
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000497 SetError(LAST_SYSTEM_ERROR);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000498 }
499
500 void MaybeRemapSendError() {
501#if defined(OSX)
502 // https://developer.apple.com/library/mac/documentation/Darwin/
503 // Reference/ManPages/man2/sendto.2.html
504 // ENOBUFS - The output queue for a network interface is full.
505 // This generally indicates that the interface has stopped sending,
506 // but may be caused by transient congestion.
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000507 if (GetError() == ENOBUFS) {
508 SetError(EWOULDBLOCK);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000509 }
510#endif
511 }
512
513 static int TranslateOption(Option opt, int* slevel, int* sopt) {
514 switch (opt) {
515 case OPT_DONTFRAGMENT:
516#ifdef WIN32
517 *slevel = IPPROTO_IP;
518 *sopt = IP_DONTFRAGMENT;
519 break;
520#elif defined(IOS) || defined(OSX) || defined(BSD)
521 LOG(LS_WARNING) << "Socket::OPT_DONTFRAGMENT not supported.";
522 return -1;
523#elif defined(POSIX)
524 *slevel = IPPROTO_IP;
525 *sopt = IP_MTU_DISCOVER;
526 break;
527#endif
528 case OPT_RCVBUF:
529 *slevel = SOL_SOCKET;
530 *sopt = SO_RCVBUF;
531 break;
532 case OPT_SNDBUF:
533 *slevel = SOL_SOCKET;
534 *sopt = SO_SNDBUF;
535 break;
536 case OPT_NODELAY:
537 *slevel = IPPROTO_TCP;
538 *sopt = TCP_NODELAY;
539 break;
wu@webrtc.org97077a32013-10-25 21:18:33 +0000540 case OPT_DSCP:
541 LOG(LS_WARNING) << "Socket::OPT_DSCP not supported.";
542 return -1;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000543 default:
544 ASSERT(false);
545 return -1;
546 }
547 return 0;
548 }
549
550 PhysicalSocketServer* ss_;
551 SOCKET s_;
552 uint8 enabled_events_;
553 bool udp_;
554 int error_;
wu@webrtc.orgd371a292013-10-23 23:56:09 +0000555 // Protects |error_| that is accessed from different threads.
556 mutable CriticalSection crit_;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000557 ConnState state_;
558 AsyncResolver* resolver_;
559
560#ifdef _DEBUG
561 std::string dbg_addr_;
562#endif // _DEBUG;
563};
564
565#ifdef POSIX
566class EventDispatcher : public Dispatcher {
567 public:
568 EventDispatcher(PhysicalSocketServer* ss) : ss_(ss), fSignaled_(false) {
569 if (pipe(afd_) < 0)
570 LOG(LERROR) << "pipe failed";
571 ss_->Add(this);
572 }
573
574 virtual ~EventDispatcher() {
575 ss_->Remove(this);
576 close(afd_[0]);
577 close(afd_[1]);
578 }
579
580 virtual void Signal() {
581 CritScope cs(&crit_);
582 if (!fSignaled_) {
583 const uint8 b[1] = { 0 };
584 if (VERIFY(1 == write(afd_[1], b, sizeof(b)))) {
585 fSignaled_ = true;
586 }
587 }
588 }
589
590 virtual uint32 GetRequestedEvents() {
591 return DE_READ;
592 }
593
594 virtual void OnPreEvent(uint32 ff) {
595 // It is not possible to perfectly emulate an auto-resetting event with
596 // pipes. This simulates it by resetting before the event is handled.
597
598 CritScope cs(&crit_);
599 if (fSignaled_) {
600 uint8 b[4]; // Allow for reading more than 1 byte, but expect 1.
601 VERIFY(1 == read(afd_[0], b, sizeof(b)));
602 fSignaled_ = false;
603 }
604 }
605
606 virtual void OnEvent(uint32 ff, int err) {
607 ASSERT(false);
608 }
609
610 virtual int GetDescriptor() {
611 return afd_[0];
612 }
613
614 virtual bool IsDescriptorClosed() {
615 return false;
616 }
617
618 private:
619 PhysicalSocketServer *ss_;
620 int afd_[2];
621 bool fSignaled_;
622 CriticalSection crit_;
623};
624
625// These two classes use the self-pipe trick to deliver POSIX signals to our
626// select loop. This is the only safe, reliable, cross-platform way to do
627// non-trivial things with a POSIX signal in an event-driven program (until
628// proper pselect() implementations become ubiquitous).
629
630class PosixSignalHandler {
631 public:
632 // POSIX only specifies 32 signals, but in principle the system might have
633 // more and the programmer might choose to use them, so we size our array
634 // for 128.
635 static const int kNumPosixSignals = 128;
636
637 // There is just a single global instance. (Signal handlers do not get any
638 // sort of user-defined void * parameter, so they can't access anything that
639 // isn't global.)
640 static PosixSignalHandler* Instance() {
641 LIBJINGLE_DEFINE_STATIC_LOCAL(PosixSignalHandler, instance, ());
642 return &instance;
643 }
644
645 // Returns true if the given signal number is set.
646 bool IsSignalSet(int signum) const {
647 ASSERT(signum < ARRAY_SIZE(received_signal_));
648 if (signum < ARRAY_SIZE(received_signal_)) {
649 return received_signal_[signum];
650 } else {
651 return false;
652 }
653 }
654
655 // Clears the given signal number.
656 void ClearSignal(int signum) {
657 ASSERT(signum < ARRAY_SIZE(received_signal_));
658 if (signum < ARRAY_SIZE(received_signal_)) {
659 received_signal_[signum] = false;
660 }
661 }
662
663 // Returns the file descriptor to monitor for signal events.
664 int GetDescriptor() const {
665 return afd_[0];
666 }
667
668 // This is called directly from our real signal handler, so it must be
669 // signal-handler-safe. That means it cannot assume anything about the
670 // user-level state of the process, since the handler could be executed at any
671 // time on any thread.
672 void OnPosixSignalReceived(int signum) {
673 if (signum >= ARRAY_SIZE(received_signal_)) {
674 // We don't have space in our array for this.
675 return;
676 }
677 // Set a flag saying we've seen this signal.
678 received_signal_[signum] = true;
679 // Notify application code that we got a signal.
680 const uint8 b[1] = { 0 };
681 if (-1 == write(afd_[1], b, sizeof(b))) {
682 // Nothing we can do here. If there's an error somehow then there's
683 // nothing we can safely do from a signal handler.
684 // No, we can't even safely log it.
685 // But, we still have to check the return value here. Otherwise,
686 // GCC 4.4.1 complains ignoring return value. Even (void) doesn't help.
687 return;
688 }
689 }
690
691 private:
692 PosixSignalHandler() {
693 if (pipe(afd_) < 0) {
694 LOG_ERR(LS_ERROR) << "pipe failed";
695 return;
696 }
697 if (fcntl(afd_[0], F_SETFL, O_NONBLOCK) < 0) {
698 LOG_ERR(LS_WARNING) << "fcntl #1 failed";
699 }
700 if (fcntl(afd_[1], F_SETFL, O_NONBLOCK) < 0) {
701 LOG_ERR(LS_WARNING) << "fcntl #2 failed";
702 }
703 memset(const_cast<void *>(static_cast<volatile void *>(received_signal_)),
704 0,
705 sizeof(received_signal_));
706 }
707
708 ~PosixSignalHandler() {
709 int fd1 = afd_[0];
710 int fd2 = afd_[1];
711 // We clobber the stored file descriptor numbers here or else in principle
712 // a signal that happens to be delivered during application termination
713 // could erroneously write a zero byte to an unrelated file handle in
714 // OnPosixSignalReceived() if some other file happens to be opened later
715 // during shutdown and happens to be given the same file descriptor number
716 // as our pipe had. Unfortunately even with this precaution there is still a
717 // race where that could occur if said signal happens to be handled
718 // concurrently with this code and happens to have already read the value of
719 // afd_[1] from memory before we clobber it, but that's unlikely.
720 afd_[0] = -1;
721 afd_[1] = -1;
722 close(fd1);
723 close(fd2);
724 }
725
726 int afd_[2];
727 // These are boolean flags that will be set in our signal handler and read
728 // and cleared from Wait(). There is a race involved in this, but it is
729 // benign. The signal handler sets the flag before signaling the pipe, so
730 // we'll never end up blocking in select() while a flag is still true.
731 // However, if two of the same signal arrive close to each other then it's
732 // possible that the second time the handler may set the flag while it's still
733 // true, meaning that signal will be missed. But the first occurrence of it
734 // will still be handled, so this isn't a problem.
735 // Volatile is not necessary here for correctness, but this data _is_ volatile
736 // so I've marked it as such.
737 volatile uint8 received_signal_[kNumPosixSignals];
738};
739
740class PosixSignalDispatcher : public Dispatcher {
741 public:
742 PosixSignalDispatcher(PhysicalSocketServer *owner) : owner_(owner) {
743 owner_->Add(this);
744 }
745
746 virtual ~PosixSignalDispatcher() {
747 owner_->Remove(this);
748 }
749
750 virtual uint32 GetRequestedEvents() {
751 return DE_READ;
752 }
753
754 virtual void OnPreEvent(uint32 ff) {
755 // Events might get grouped if signals come very fast, so we read out up to
756 // 16 bytes to make sure we keep the pipe empty.
757 uint8 b[16];
758 ssize_t ret = read(GetDescriptor(), b, sizeof(b));
759 if (ret < 0) {
760 LOG_ERR(LS_WARNING) << "Error in read()";
761 } else if (ret == 0) {
762 LOG(LS_WARNING) << "Should have read at least one byte";
763 }
764 }
765
766 virtual void OnEvent(uint32 ff, int err) {
767 for (int signum = 0; signum < PosixSignalHandler::kNumPosixSignals;
768 ++signum) {
769 if (PosixSignalHandler::Instance()->IsSignalSet(signum)) {
770 PosixSignalHandler::Instance()->ClearSignal(signum);
771 HandlerMap::iterator i = handlers_.find(signum);
772 if (i == handlers_.end()) {
773 // This can happen if a signal is delivered to our process at around
774 // the same time as we unset our handler for it. It is not an error
775 // condition, but it's unusual enough to be worth logging.
776 LOG(LS_INFO) << "Received signal with no handler: " << signum;
777 } else {
778 // Otherwise, execute our handler.
779 (*i->second)(signum);
780 }
781 }
782 }
783 }
784
785 virtual int GetDescriptor() {
786 return PosixSignalHandler::Instance()->GetDescriptor();
787 }
788
789 virtual bool IsDescriptorClosed() {
790 return false;
791 }
792
793 void SetHandler(int signum, void (*handler)(int)) {
794 handlers_[signum] = handler;
795 }
796
797 void ClearHandler(int signum) {
798 handlers_.erase(signum);
799 }
800
801 bool HasHandlers() {
802 return !handlers_.empty();
803 }
804
805 private:
806 typedef std::map<int, void (*)(int)> HandlerMap;
807
808 HandlerMap handlers_;
809 // Our owner.
810 PhysicalSocketServer *owner_;
811};
812
813class SocketDispatcher : public Dispatcher, public PhysicalSocket {
814 public:
815 explicit SocketDispatcher(PhysicalSocketServer *ss) : PhysicalSocket(ss) {
816 }
817 SocketDispatcher(SOCKET s, PhysicalSocketServer *ss) : PhysicalSocket(ss, s) {
818 }
819
820 virtual ~SocketDispatcher() {
821 Close();
822 }
823
824 bool Initialize() {
825 ss_->Add(this);
826 fcntl(s_, F_SETFL, fcntl(s_, F_GETFL, 0) | O_NONBLOCK);
827 return true;
828 }
829
830 virtual bool Create(int type) {
831 return Create(AF_INET, type);
832 }
833
834 virtual bool Create(int family, int type) {
835 // Change the socket to be non-blocking.
836 if (!PhysicalSocket::Create(family, type))
837 return false;
838
839 return Initialize();
840 }
841
842 virtual int GetDescriptor() {
843 return s_;
844 }
845
846 virtual bool IsDescriptorClosed() {
847 // We don't have a reliable way of distinguishing end-of-stream
848 // from readability. So test on each readable call. Is this
849 // inefficient? Probably.
850 char ch;
851 ssize_t res = ::recv(s_, &ch, 1, MSG_PEEK);
852 if (res > 0) {
853 // Data available, so not closed.
854 return false;
855 } else if (res == 0) {
856 // EOF, so closed.
857 return true;
858 } else { // error
859 switch (errno) {
860 // Returned if we've already closed s_.
861 case EBADF:
862 // Returned during ungraceful peer shutdown.
863 case ECONNRESET:
864 return true;
865 default:
866 // Assume that all other errors are just blocking errors, meaning the
867 // connection is still good but we just can't read from it right now.
868 // This should only happen when connecting (and at most once), because
869 // in all other cases this function is only called if the file
870 // descriptor is already known to be in the readable state. However,
871 // it's not necessary a problem if we spuriously interpret a
872 // "connection lost"-type error as a blocking error, because typically
873 // the next recv() will get EOF, so we'll still eventually notice that
874 // the socket is closed.
875 LOG_ERR(LS_WARNING) << "Assuming benign blocking error";
876 return false;
877 }
878 }
879 }
880
881 virtual uint32 GetRequestedEvents() {
882 return enabled_events_;
883 }
884
885 virtual void OnPreEvent(uint32 ff) {
886 if ((ff & DE_CONNECT) != 0)
887 state_ = CS_CONNECTED;
888 if ((ff & DE_CLOSE) != 0)
889 state_ = CS_CLOSED;
890 }
891
892 virtual void OnEvent(uint32 ff, int err) {
893 // Make sure we deliver connect/accept first. Otherwise, consumers may see
894 // something like a READ followed by a CONNECT, which would be odd.
895 if ((ff & DE_CONNECT) != 0) {
896 enabled_events_ &= ~DE_CONNECT;
897 SignalConnectEvent(this);
898 }
899 if ((ff & DE_ACCEPT) != 0) {
900 enabled_events_ &= ~DE_ACCEPT;
901 SignalReadEvent(this);
902 }
903 if ((ff & DE_READ) != 0) {
904 enabled_events_ &= ~DE_READ;
905 SignalReadEvent(this);
906 }
907 if ((ff & DE_WRITE) != 0) {
908 enabled_events_ &= ~DE_WRITE;
909 SignalWriteEvent(this);
910 }
911 if ((ff & DE_CLOSE) != 0) {
912 // The socket is now dead to us, so stop checking it.
913 enabled_events_ = 0;
914 SignalCloseEvent(this, err);
915 }
916 }
917
918 virtual int Close() {
919 if (s_ == INVALID_SOCKET)
920 return 0;
921
922 ss_->Remove(this);
923 return PhysicalSocket::Close();
924 }
925};
926
927class FileDispatcher: public Dispatcher, public AsyncFile {
928 public:
929 FileDispatcher(int fd, PhysicalSocketServer *ss) : ss_(ss), fd_(fd) {
930 set_readable(true);
931
932 ss_->Add(this);
933
934 fcntl(fd_, F_SETFL, fcntl(fd_, F_GETFL, 0) | O_NONBLOCK);
935 }
936
937 virtual ~FileDispatcher() {
938 ss_->Remove(this);
939 }
940
941 SocketServer* socketserver() { return ss_; }
942
943 virtual int GetDescriptor() {
944 return fd_;
945 }
946
947 virtual bool IsDescriptorClosed() {
948 return false;
949 }
950
951 virtual uint32 GetRequestedEvents() {
952 return flags_;
953 }
954
955 virtual void OnPreEvent(uint32 ff) {
956 }
957
958 virtual void OnEvent(uint32 ff, int err) {
959 if ((ff & DE_READ) != 0)
960 SignalReadEvent(this);
961 if ((ff & DE_WRITE) != 0)
962 SignalWriteEvent(this);
963 if ((ff & DE_CLOSE) != 0)
964 SignalCloseEvent(this, err);
965 }
966
967 virtual bool readable() {
968 return (flags_ & DE_READ) != 0;
969 }
970
971 virtual void set_readable(bool value) {
972 flags_ = value ? (flags_ | DE_READ) : (flags_ & ~DE_READ);
973 }
974
975 virtual bool writable() {
976 return (flags_ & DE_WRITE) != 0;
977 }
978
979 virtual void set_writable(bool value) {
980 flags_ = value ? (flags_ | DE_WRITE) : (flags_ & ~DE_WRITE);
981 }
982
983 private:
984 PhysicalSocketServer* ss_;
985 int fd_;
986 int flags_;
987};
988
989AsyncFile* PhysicalSocketServer::CreateFile(int fd) {
990 return new FileDispatcher(fd, this);
991}
992
993#endif // POSIX
994
995#ifdef WIN32
996static uint32 FlagsToEvents(uint32 events) {
997 uint32 ffFD = FD_CLOSE;
998 if (events & DE_READ)
999 ffFD |= FD_READ;
1000 if (events & DE_WRITE)
1001 ffFD |= FD_WRITE;
1002 if (events & DE_CONNECT)
1003 ffFD |= FD_CONNECT;
1004 if (events & DE_ACCEPT)
1005 ffFD |= FD_ACCEPT;
1006 return ffFD;
1007}
1008
1009class EventDispatcher : public Dispatcher {
1010 public:
1011 EventDispatcher(PhysicalSocketServer *ss) : ss_(ss) {
1012 hev_ = WSACreateEvent();
1013 if (hev_) {
1014 ss_->Add(this);
1015 }
1016 }
1017
1018 ~EventDispatcher() {
1019 if (hev_ != NULL) {
1020 ss_->Remove(this);
1021 WSACloseEvent(hev_);
1022 hev_ = NULL;
1023 }
1024 }
1025
1026 virtual void Signal() {
1027 if (hev_ != NULL)
1028 WSASetEvent(hev_);
1029 }
1030
1031 virtual uint32 GetRequestedEvents() {
1032 return 0;
1033 }
1034
1035 virtual void OnPreEvent(uint32 ff) {
1036 WSAResetEvent(hev_);
1037 }
1038
1039 virtual void OnEvent(uint32 ff, int err) {
1040 }
1041
1042 virtual WSAEVENT GetWSAEvent() {
1043 return hev_;
1044 }
1045
1046 virtual SOCKET GetSocket() {
1047 return INVALID_SOCKET;
1048 }
1049
1050 virtual bool CheckSignalClose() { return false; }
1051
1052private:
1053 PhysicalSocketServer* ss_;
1054 WSAEVENT hev_;
1055};
1056
1057class SocketDispatcher : public Dispatcher, public PhysicalSocket {
1058 public:
1059 static int next_id_;
1060 int id_;
1061 bool signal_close_;
1062 int signal_err_;
1063
1064 SocketDispatcher(PhysicalSocketServer* ss)
1065 : PhysicalSocket(ss),
1066 id_(0),
1067 signal_close_(false) {
1068 }
1069
1070 SocketDispatcher(SOCKET s, PhysicalSocketServer* ss)
1071 : PhysicalSocket(ss, s),
1072 id_(0),
1073 signal_close_(false) {
1074 }
1075
1076 virtual ~SocketDispatcher() {
1077 Close();
1078 }
1079
1080 bool Initialize() {
1081 ASSERT(s_ != INVALID_SOCKET);
1082 // Must be a non-blocking
1083 u_long argp = 1;
1084 ioctlsocket(s_, FIONBIO, &argp);
1085 ss_->Add(this);
1086 return true;
1087 }
1088
1089 virtual bool Create(int type) {
1090 return Create(AF_INET, type);
1091 }
1092
1093 virtual bool Create(int family, int type) {
1094 // Create socket
1095 if (!PhysicalSocket::Create(family, type))
1096 return false;
1097
1098 if (!Initialize())
1099 return false;
1100
1101 do { id_ = ++next_id_; } while (id_ == 0);
1102 return true;
1103 }
1104
1105 virtual int Close() {
1106 if (s_ == INVALID_SOCKET)
1107 return 0;
1108
1109 id_ = 0;
1110 signal_close_ = false;
1111 ss_->Remove(this);
1112 return PhysicalSocket::Close();
1113 }
1114
1115 virtual uint32 GetRequestedEvents() {
1116 return enabled_events_;
1117 }
1118
1119 virtual void OnPreEvent(uint32 ff) {
1120 if ((ff & DE_CONNECT) != 0)
1121 state_ = CS_CONNECTED;
1122 // We set CS_CLOSED from CheckSignalClose.
1123 }
1124
1125 virtual void OnEvent(uint32 ff, int err) {
1126 int cache_id = id_;
1127 // Make sure we deliver connect/accept first. Otherwise, consumers may see
1128 // something like a READ followed by a CONNECT, which would be odd.
1129 if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
1130 if (ff != DE_CONNECT)
1131 LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
1132 enabled_events_ &= ~DE_CONNECT;
1133#ifdef _DEBUG
1134 dbg_addr_ = "Connected @ ";
1135 dbg_addr_.append(GetRemoteAddress().ToString());
1136#endif // _DEBUG
1137 SignalConnectEvent(this);
1138 }
1139 if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
1140 enabled_events_ &= ~DE_ACCEPT;
1141 SignalReadEvent(this);
1142 }
1143 if ((ff & DE_READ) != 0) {
1144 enabled_events_ &= ~DE_READ;
1145 SignalReadEvent(this);
1146 }
1147 if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
1148 enabled_events_ &= ~DE_WRITE;
1149 SignalWriteEvent(this);
1150 }
1151 if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
1152 signal_close_ = true;
1153 signal_err_ = err;
1154 }
1155 }
1156
1157 virtual WSAEVENT GetWSAEvent() {
1158 return WSA_INVALID_EVENT;
1159 }
1160
1161 virtual SOCKET GetSocket() {
1162 return s_;
1163 }
1164
1165 virtual bool CheckSignalClose() {
1166 if (!signal_close_)
1167 return false;
1168
1169 char ch;
1170 if (recv(s_, &ch, 1, MSG_PEEK) > 0)
1171 return false;
1172
1173 state_ = CS_CLOSED;
1174 signal_close_ = false;
1175 SignalCloseEvent(this, signal_err_);
1176 return true;
1177 }
1178};
1179
1180int SocketDispatcher::next_id_ = 0;
1181
1182#endif // WIN32
1183
1184// Sets the value of a boolean value to false when signaled.
1185class Signaler : public EventDispatcher {
1186 public:
1187 Signaler(PhysicalSocketServer* ss, bool* pf)
1188 : EventDispatcher(ss), pf_(pf) {
1189 }
1190 virtual ~Signaler() { }
1191
1192 void OnEvent(uint32 ff, int err) {
1193 if (pf_)
1194 *pf_ = false;
1195 }
1196
1197 private:
1198 bool *pf_;
1199};
1200
1201PhysicalSocketServer::PhysicalSocketServer()
1202 : fWait_(false),
1203 last_tick_tracked_(0),
1204 last_tick_dispatch_count_(0) {
1205 signal_wakeup_ = new Signaler(this, &fWait_);
1206#ifdef WIN32
1207 socket_ev_ = WSACreateEvent();
1208#endif
1209}
1210
1211PhysicalSocketServer::~PhysicalSocketServer() {
1212#ifdef WIN32
1213 WSACloseEvent(socket_ev_);
1214#endif
1215#ifdef POSIX
1216 signal_dispatcher_.reset();
1217#endif
1218 delete signal_wakeup_;
1219 ASSERT(dispatchers_.empty());
1220}
1221
1222void PhysicalSocketServer::WakeUp() {
1223 signal_wakeup_->Signal();
1224}
1225
1226Socket* PhysicalSocketServer::CreateSocket(int type) {
1227 return CreateSocket(AF_INET, type);
1228}
1229
1230Socket* PhysicalSocketServer::CreateSocket(int family, int type) {
1231 PhysicalSocket* socket = new PhysicalSocket(this);
1232 if (socket->Create(family, type)) {
1233 return socket;
1234 } else {
1235 delete socket;
1236 return 0;
1237 }
1238}
1239
1240AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int type) {
1241 return CreateAsyncSocket(AF_INET, type);
1242}
1243
1244AsyncSocket* PhysicalSocketServer::CreateAsyncSocket(int family, int type) {
1245 SocketDispatcher* dispatcher = new SocketDispatcher(this);
1246 if (dispatcher->Create(family, type)) {
1247 return dispatcher;
1248 } else {
1249 delete dispatcher;
1250 return 0;
1251 }
1252}
1253
1254AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
1255 SocketDispatcher* dispatcher = new SocketDispatcher(s, this);
1256 if (dispatcher->Initialize()) {
1257 return dispatcher;
1258 } else {
1259 delete dispatcher;
1260 return 0;
1261 }
1262}
1263
1264void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
1265 CritScope cs(&crit_);
1266 // Prevent duplicates. This can cause dead dispatchers to stick around.
1267 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1268 dispatchers_.end(),
1269 pdispatcher);
1270 if (pos != dispatchers_.end())
1271 return;
1272 dispatchers_.push_back(pdispatcher);
1273}
1274
1275void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
1276 CritScope cs(&crit_);
1277 DispatcherList::iterator pos = std::find(dispatchers_.begin(),
1278 dispatchers_.end(),
1279 pdispatcher);
wu@webrtc.org967bfff2013-09-19 05:49:50 +00001280 // We silently ignore duplicate calls to Add, so we should silently ignore
1281 // the (expected) symmetric calls to Remove. Note that this may still hide
1282 // a real issue, so we at least log a warning about it.
1283 if (pos == dispatchers_.end()) {
1284 LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
1285 << "dispatcher, potentially from a duplicate call to Add.";
1286 return;
1287 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001288 size_t index = pos - dispatchers_.begin();
1289 dispatchers_.erase(pos);
1290 for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
1291 ++it) {
1292 if (index < **it) {
1293 --**it;
1294 }
1295 }
1296}
1297
1298#ifdef POSIX
1299bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1300 // Calculate timing information
1301
1302 struct timeval *ptvWait = NULL;
1303 struct timeval tvWait;
1304 struct timeval tvStop;
1305 if (cmsWait != kForever) {
1306 // Calculate wait timeval
1307 tvWait.tv_sec = cmsWait / 1000;
1308 tvWait.tv_usec = (cmsWait % 1000) * 1000;
1309 ptvWait = &tvWait;
1310
1311 // Calculate when to return in a timeval
1312 gettimeofday(&tvStop, NULL);
1313 tvStop.tv_sec += tvWait.tv_sec;
1314 tvStop.tv_usec += tvWait.tv_usec;
1315 if (tvStop.tv_usec >= 1000000) {
1316 tvStop.tv_usec -= 1000000;
1317 tvStop.tv_sec += 1;
1318 }
1319 }
1320
1321 // Zero all fd_sets. Don't need to do this inside the loop since
1322 // select() zeros the descriptors not signaled
1323
1324 fd_set fdsRead;
1325 FD_ZERO(&fdsRead);
1326 fd_set fdsWrite;
1327 FD_ZERO(&fdsWrite);
1328
1329 fWait_ = true;
1330
1331 while (fWait_) {
1332 int fdmax = -1;
1333 {
1334 CritScope cr(&crit_);
1335 for (size_t i = 0; i < dispatchers_.size(); ++i) {
1336 // Query dispatchers for read and write wait state
1337 Dispatcher *pdispatcher = dispatchers_[i];
1338 ASSERT(pdispatcher);
1339 if (!process_io && (pdispatcher != signal_wakeup_))
1340 continue;
1341 int fd = pdispatcher->GetDescriptor();
1342 if (fd > fdmax)
1343 fdmax = fd;
1344
1345 uint32 ff = pdispatcher->GetRequestedEvents();
1346 if (ff & (DE_READ | DE_ACCEPT))
1347 FD_SET(fd, &fdsRead);
1348 if (ff & (DE_WRITE | DE_CONNECT))
1349 FD_SET(fd, &fdsWrite);
1350 }
1351 }
1352
1353 // Wait then call handlers as appropriate
1354 // < 0 means error
1355 // 0 means timeout
1356 // > 0 means count of descriptors ready
1357 int n = select(fdmax + 1, &fdsRead, &fdsWrite, NULL, ptvWait);
1358
1359 // If error, return error.
1360 if (n < 0) {
1361 if (errno != EINTR) {
1362 LOG_E(LS_ERROR, EN, errno) << "select";
1363 return false;
1364 }
1365 // Else ignore the error and keep going. If this EINTR was for one of the
1366 // signals managed by this PhysicalSocketServer, the
1367 // PosixSignalDeliveryDispatcher will be in the signaled state in the next
1368 // iteration.
1369 } else if (n == 0) {
1370 // If timeout, return success
1371 return true;
1372 } else {
1373 // We have signaled descriptors
1374 CritScope cr(&crit_);
1375 for (size_t i = 0; i < dispatchers_.size(); ++i) {
1376 Dispatcher *pdispatcher = dispatchers_[i];
1377 int fd = pdispatcher->GetDescriptor();
1378 uint32 ff = 0;
1379 int errcode = 0;
1380
1381 // Reap any error code, which can be signaled through reads or writes.
1382 // TODO: Should we set errcode if getsockopt fails?
1383 if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
1384 socklen_t len = sizeof(errcode);
1385 ::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
1386 }
1387
1388 // Check readable descriptors. If we're waiting on an accept, signal
1389 // that. Otherwise we're waiting for data, check to see if we're
1390 // readable or really closed.
1391 // TODO: Only peek at TCP descriptors.
1392 if (FD_ISSET(fd, &fdsRead)) {
1393 FD_CLR(fd, &fdsRead);
1394 if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
1395 ff |= DE_ACCEPT;
1396 } else if (errcode || pdispatcher->IsDescriptorClosed()) {
1397 ff |= DE_CLOSE;
1398 } else {
1399 ff |= DE_READ;
1400 }
1401 }
1402
1403 // Check writable descriptors. If we're waiting on a connect, detect
1404 // success versus failure by the reaped error code.
1405 if (FD_ISSET(fd, &fdsWrite)) {
1406 FD_CLR(fd, &fdsWrite);
1407 if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
1408 if (!errcode) {
1409 ff |= DE_CONNECT;
1410 } else {
1411 ff |= DE_CLOSE;
1412 }
1413 } else {
1414 ff |= DE_WRITE;
1415 }
1416 }
1417
1418 // Tell the descriptor about the event.
1419 if (ff != 0) {
1420 pdispatcher->OnPreEvent(ff);
1421 pdispatcher->OnEvent(ff, errcode);
1422 }
1423 }
1424 }
1425
1426 // Recalc the time remaining to wait. Doing it here means it doesn't get
1427 // calced twice the first time through the loop
1428 if (ptvWait) {
1429 ptvWait->tv_sec = 0;
1430 ptvWait->tv_usec = 0;
1431 struct timeval tvT;
1432 gettimeofday(&tvT, NULL);
1433 if ((tvStop.tv_sec > tvT.tv_sec)
1434 || ((tvStop.tv_sec == tvT.tv_sec)
1435 && (tvStop.tv_usec > tvT.tv_usec))) {
1436 ptvWait->tv_sec = tvStop.tv_sec - tvT.tv_sec;
1437 ptvWait->tv_usec = tvStop.tv_usec - tvT.tv_usec;
1438 if (ptvWait->tv_usec < 0) {
1439 ASSERT(ptvWait->tv_sec > 0);
1440 ptvWait->tv_usec += 1000000;
1441 ptvWait->tv_sec -= 1;
1442 }
1443 }
1444 }
1445 }
1446
1447 return true;
1448}
1449
1450static void GlobalSignalHandler(int signum) {
1451 PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
1452}
1453
1454bool PhysicalSocketServer::SetPosixSignalHandler(int signum,
1455 void (*handler)(int)) {
1456 // If handler is SIG_IGN or SIG_DFL then clear our user-level handler,
1457 // otherwise set one.
1458 if (handler == SIG_IGN || handler == SIG_DFL) {
1459 if (!InstallSignal(signum, handler)) {
1460 return false;
1461 }
1462 if (signal_dispatcher_) {
1463 signal_dispatcher_->ClearHandler(signum);
1464 if (!signal_dispatcher_->HasHandlers()) {
1465 signal_dispatcher_.reset();
1466 }
1467 }
1468 } else {
1469 if (!signal_dispatcher_) {
1470 signal_dispatcher_.reset(new PosixSignalDispatcher(this));
1471 }
1472 signal_dispatcher_->SetHandler(signum, handler);
1473 if (!InstallSignal(signum, &GlobalSignalHandler)) {
1474 return false;
1475 }
1476 }
1477 return true;
1478}
1479
1480Dispatcher* PhysicalSocketServer::signal_dispatcher() {
1481 return signal_dispatcher_.get();
1482}
1483
1484bool PhysicalSocketServer::InstallSignal(int signum, void (*handler)(int)) {
1485 struct sigaction act;
1486 // It doesn't really matter what we set this mask to.
1487 if (sigemptyset(&act.sa_mask) != 0) {
1488 LOG_ERR(LS_ERROR) << "Couldn't set mask";
1489 return false;
1490 }
1491 act.sa_handler = handler;
1492 // Use SA_RESTART so that our syscalls don't get EINTR, since we don't need it
1493 // and it's a nuisance. Though some syscalls still return EINTR and there's no
1494 // real standard for which ones. :(
1495 act.sa_flags = SA_RESTART;
1496 if (sigaction(signum, &act, NULL) != 0) {
1497 LOG_ERR(LS_ERROR) << "Couldn't set sigaction";
1498 return false;
1499 }
1500 return true;
1501}
1502#endif // POSIX
1503
1504#ifdef WIN32
1505bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
1506 int cmsTotal = cmsWait;
1507 int cmsElapsed = 0;
1508 uint32 msStart = Time();
1509
1510#if LOGGING
1511 if (last_tick_dispatch_count_ == 0) {
1512 last_tick_tracked_ = msStart;
1513 }
1514#endif
1515
1516 fWait_ = true;
1517 while (fWait_) {
1518 std::vector<WSAEVENT> events;
1519 std::vector<Dispatcher *> event_owners;
1520
1521 events.push_back(socket_ev_);
1522
1523 {
1524 CritScope cr(&crit_);
1525 size_t i = 0;
1526 iterators_.push_back(&i);
1527 // Don't track dispatchers_.size(), because we want to pick up any new
1528 // dispatchers that were added while processing the loop.
1529 while (i < dispatchers_.size()) {
1530 Dispatcher* disp = dispatchers_[i++];
1531 if (!process_io && (disp != signal_wakeup_))
1532 continue;
1533 SOCKET s = disp->GetSocket();
1534 if (disp->CheckSignalClose()) {
1535 // We just signalled close, don't poll this socket
1536 } else if (s != INVALID_SOCKET) {
1537 WSAEventSelect(s,
1538 events[0],
1539 FlagsToEvents(disp->GetRequestedEvents()));
1540 } else {
1541 events.push_back(disp->GetWSAEvent());
1542 event_owners.push_back(disp);
1543 }
1544 }
1545 ASSERT(iterators_.back() == &i);
1546 iterators_.pop_back();
1547 }
1548
1549 // Which is shorter, the delay wait or the asked wait?
1550
1551 int cmsNext;
1552 if (cmsWait == kForever) {
1553 cmsNext = cmsWait;
1554 } else {
1555 cmsNext = _max(0, cmsTotal - cmsElapsed);
1556 }
1557
1558 // Wait for one of the events to signal
1559 DWORD dw = WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()),
1560 &events[0],
1561 false,
1562 cmsNext,
1563 false);
1564
1565#if 0 // LOGGING
1566 // we track this information purely for logging purposes.
1567 last_tick_dispatch_count_++;
1568 if (last_tick_dispatch_count_ >= 1000) {
1569 int32 elapsed = TimeSince(last_tick_tracked_);
1570 LOG(INFO) << "PhysicalSocketServer took " << elapsed
1571 << "ms for 1000 events";
1572
1573 // If we get more than 1000 events in a second, we are spinning badly
1574 // (normally it should take about 8-20 seconds).
1575 ASSERT(elapsed > 1000);
1576
1577 last_tick_tracked_ = Time();
1578 last_tick_dispatch_count_ = 0;
1579 }
1580#endif
1581
1582 if (dw == WSA_WAIT_FAILED) {
1583 // Failed?
1584 // TODO: need a better strategy than this!
1585 int error = WSAGetLastError();
1586 ASSERT(false);
1587 return false;
1588 } else if (dw == WSA_WAIT_TIMEOUT) {
1589 // Timeout?
1590 return true;
1591 } else {
1592 // Figure out which one it is and call it
1593 CritScope cr(&crit_);
1594 int index = dw - WSA_WAIT_EVENT_0;
1595 if (index > 0) {
1596 --index; // The first event is the socket event
1597 event_owners[index]->OnPreEvent(0);
1598 event_owners[index]->OnEvent(0, 0);
1599 } else if (process_io) {
1600 size_t i = 0, end = dispatchers_.size();
1601 iterators_.push_back(&i);
1602 iterators_.push_back(&end); // Don't iterate over new dispatchers.
1603 while (i < end) {
1604 Dispatcher* disp = dispatchers_[i++];
1605 SOCKET s = disp->GetSocket();
1606 if (s == INVALID_SOCKET)
1607 continue;
1608
1609 WSANETWORKEVENTS wsaEvents;
1610 int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
1611 if (err == 0) {
1612
1613#if LOGGING
1614 {
1615 if ((wsaEvents.lNetworkEvents & FD_READ) &&
1616 wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
1617 LOG(WARNING) << "PhysicalSocketServer got FD_READ_BIT error "
1618 << wsaEvents.iErrorCode[FD_READ_BIT];
1619 }
1620 if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
1621 wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
1622 LOG(WARNING) << "PhysicalSocketServer got FD_WRITE_BIT error "
1623 << wsaEvents.iErrorCode[FD_WRITE_BIT];
1624 }
1625 if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
1626 wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
1627 LOG(WARNING) << "PhysicalSocketServer got FD_CONNECT_BIT error "
1628 << wsaEvents.iErrorCode[FD_CONNECT_BIT];
1629 }
1630 if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
1631 wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
1632 LOG(WARNING) << "PhysicalSocketServer got FD_ACCEPT_BIT error "
1633 << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
1634 }
1635 if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
1636 wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
1637 LOG(WARNING) << "PhysicalSocketServer got FD_CLOSE_BIT error "
1638 << wsaEvents.iErrorCode[FD_CLOSE_BIT];
1639 }
1640 }
1641#endif
1642 uint32 ff = 0;
1643 int errcode = 0;
1644 if (wsaEvents.lNetworkEvents & FD_READ)
1645 ff |= DE_READ;
1646 if (wsaEvents.lNetworkEvents & FD_WRITE)
1647 ff |= DE_WRITE;
1648 if (wsaEvents.lNetworkEvents & FD_CONNECT) {
1649 if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
1650 ff |= DE_CONNECT;
1651 } else {
1652 ff |= DE_CLOSE;
1653 errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
1654 }
1655 }
1656 if (wsaEvents.lNetworkEvents & FD_ACCEPT)
1657 ff |= DE_ACCEPT;
1658 if (wsaEvents.lNetworkEvents & FD_CLOSE) {
1659 ff |= DE_CLOSE;
1660 errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
1661 }
1662 if (ff != 0) {
1663 disp->OnPreEvent(ff);
1664 disp->OnEvent(ff, errcode);
1665 }
1666 }
1667 }
1668 ASSERT(iterators_.back() == &end);
1669 iterators_.pop_back();
1670 ASSERT(iterators_.back() == &i);
1671 iterators_.pop_back();
1672 }
1673
1674 // Reset the network event until new activity occurs
1675 WSAResetEvent(socket_ev_);
1676 }
1677
1678 // Break?
1679 if (!fWait_)
1680 break;
1681 cmsElapsed = TimeSince(msStart);
1682 if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
1683 break;
1684 }
1685 }
1686
1687 // Done
1688 return true;
1689}
1690#endif // WIN32
1691
1692} // namespace talk_base