blob: 7a61093c9b67fc898213a0e60a36c1a3278deb67 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/client/basicportallocator.h"
29
30#include <string>
31#include <vector>
32
33#include "talk/base/common.h"
34#include "talk/base/helpers.h"
35#include "talk/base/host.h"
36#include "talk/base/logging.h"
37#include "talk/p2p/base/basicpacketsocketfactory.h"
38#include "talk/p2p/base/common.h"
39#include "talk/p2p/base/port.h"
40#include "talk/p2p/base/relayport.h"
41#include "talk/p2p/base/stunport.h"
42#include "talk/p2p/base/tcpport.h"
43#include "talk/p2p/base/turnport.h"
44#include "talk/p2p/base/udpport.h"
45
46using talk_base::CreateRandomId;
47using talk_base::CreateRandomString;
48
49namespace {
50
51const uint32 MSG_CONFIG_START = 1;
52const uint32 MSG_CONFIG_READY = 2;
53const uint32 MSG_ALLOCATE = 3;
54const uint32 MSG_ALLOCATION_PHASE = 4;
55const uint32 MSG_SHAKE = 5;
56const uint32 MSG_SEQUENCEOBJECTS_CREATED = 6;
57const uint32 MSG_CONFIG_STOP = 7;
58
59const uint32 ALLOCATE_DELAY = 250;
60const uint32 ALLOCATION_STEP_DELAY = 1 * 1000;
61
62const int PHASE_UDP = 0;
63const int PHASE_RELAY = 1;
64const int PHASE_TCP = 2;
65const int PHASE_SSLTCP = 3;
66
67const int kNumPhases = 4;
68
69// Both these values are in bytes.
70const int kLargeSocketSendBufferSize = 128 * 1024;
71const int kNormalSocketSendBufferSize = 64 * 1024;
72
73const int SHAKE_MIN_DELAY = 45 * 1000; // 45 seconds
74const int SHAKE_MAX_DELAY = 90 * 1000; // 90 seconds
75
76int ShakeDelay() {
77 int range = SHAKE_MAX_DELAY - SHAKE_MIN_DELAY + 1;
78 return SHAKE_MIN_DELAY + CreateRandomId() % range;
79}
80
81} // namespace
82
83namespace cricket {
84
85const uint32 DISABLE_ALL_PHASES =
86 PORTALLOCATOR_DISABLE_UDP
87 | PORTALLOCATOR_DISABLE_TCP
88 | PORTALLOCATOR_DISABLE_STUN
89 | PORTALLOCATOR_DISABLE_RELAY;
90
91// Performs the allocation of ports, in a sequenced (timed) manner, for a given
92// network and IP address.
93class AllocationSequence : public talk_base::MessageHandler,
94 public sigslot::has_slots<> {
95 public:
96 enum State {
97 kInit, // Initial state.
98 kRunning, // Started allocating ports.
99 kStopped, // Stopped from running.
100 kCompleted, // All ports are allocated.
101
102 // kInit --> kRunning --> {kCompleted|kStopped}
103 };
104
105 AllocationSequence(BasicPortAllocatorSession* session,
106 talk_base::Network* network,
107 PortConfiguration* config,
108 uint32 flags);
109 ~AllocationSequence();
110 bool Init();
111
112 State state() const { return state_; }
113
114 // Disables the phases for a new sequence that this one already covers for an
115 // equivalent network setup.
116 void DisableEquivalentPhases(talk_base::Network* network,
117 PortConfiguration* config, uint32* flags);
118
119 // Starts and stops the sequence. When started, it will continue allocating
120 // new ports on its own timed schedule.
121 void Start();
122 void Stop();
123
124 // MessageHandler
125 void OnMessage(talk_base::Message* msg);
126
127 void EnableProtocol(ProtocolType proto);
128 bool ProtocolEnabled(ProtocolType proto) const;
129
130 // Signal from AllocationSequence, when it's done with allocating ports.
131 // This signal is useful, when port allocation fails which doesn't result
132 // in any candidates. Using this signal BasicPortAllocatorSession can send
133 // its candidate discovery conclusion signal. Without this signal,
134 // BasicPortAllocatorSession doesn't have any event to trigger signal. This
135 // can also be achieved by starting timer in BPAS.
136 sigslot::signal1<AllocationSequence*> SignalPortAllocationComplete;
137
138 private:
139 typedef std::vector<ProtocolType> ProtocolList;
140
141 bool IsFlagSet(uint32 flag) {
142 return ((flags_ & flag) != 0);
143 }
144 void CreateUDPPorts();
145 void CreateTCPPorts();
146 void CreateStunPorts();
147 void CreateRelayPorts();
148 void CreateGturnPort(const RelayServerConfig& config);
149 void CreateTurnPort(const RelayServerConfig& config);
150
151 void OnReadPacket(talk_base::AsyncPacketSocket* socket,
152 const char* data, size_t size,
153 const talk_base::SocketAddress& remote_addr);
154 void OnPortDestroyed(PortInterface* port);
155
156 BasicPortAllocatorSession* session_;
157 talk_base::Network* network_;
158 talk_base::IPAddress ip_;
159 PortConfiguration* config_;
160 State state_;
161 uint32 flags_;
162 ProtocolList protocols_;
163 talk_base::scoped_ptr<talk_base::AsyncPacketSocket> udp_socket_;
164 // Keeping a list of all UDP based ports.
165 std::deque<Port*> ports;
166 int phase_;
167};
168
169// BasicPortAllocator
170BasicPortAllocator::BasicPortAllocator(
171 talk_base::NetworkManager* network_manager,
172 talk_base::PacketSocketFactory* socket_factory)
173 : network_manager_(network_manager),
174 socket_factory_(socket_factory) {
175 ASSERT(socket_factory_ != NULL);
176 Construct();
177}
178
179BasicPortAllocator::BasicPortAllocator(
180 talk_base::NetworkManager* network_manager)
181 : network_manager_(network_manager),
182 socket_factory_(NULL) {
183 Construct();
184}
185
186BasicPortAllocator::BasicPortAllocator(
187 talk_base::NetworkManager* network_manager,
188 talk_base::PacketSocketFactory* socket_factory,
189 const talk_base::SocketAddress& stun_address)
190 : network_manager_(network_manager),
191 socket_factory_(socket_factory),
192 stun_address_(stun_address) {
193 ASSERT(socket_factory_ != NULL);
194 Construct();
195}
196
197BasicPortAllocator::BasicPortAllocator(
198 talk_base::NetworkManager* network_manager,
199 const talk_base::SocketAddress& stun_address,
200 const talk_base::SocketAddress& relay_address_udp,
201 const talk_base::SocketAddress& relay_address_tcp,
202 const talk_base::SocketAddress& relay_address_ssl)
203 : network_manager_(network_manager),
204 socket_factory_(NULL),
205 stun_address_(stun_address) {
206
207 RelayServerConfig config(RELAY_GTURN);
208 if (!relay_address_udp.IsAny())
209 config.ports.push_back(ProtocolAddress(relay_address_udp, PROTO_UDP));
210 if (!relay_address_tcp.IsAny())
211 config.ports.push_back(ProtocolAddress(relay_address_tcp, PROTO_TCP));
212 if (!relay_address_ssl.IsAny())
213 config.ports.push_back(ProtocolAddress(relay_address_ssl, PROTO_SSLTCP));
214 AddRelay(config);
215
216 Construct();
217}
218
219void BasicPortAllocator::Construct() {
220 allow_tcp_listen_ = true;
221}
222
223BasicPortAllocator::~BasicPortAllocator() {
224}
225
226PortAllocatorSession *BasicPortAllocator::CreateSessionInternal(
227 const std::string& content_name, int component,
228 const std::string& ice_ufrag, const std::string& ice_pwd) {
229 return new BasicPortAllocatorSession(this, content_name, component,
230 ice_ufrag, ice_pwd);
231}
232
233// BasicPortAllocatorSession
234BasicPortAllocatorSession::BasicPortAllocatorSession(
235 BasicPortAllocator *allocator,
236 const std::string& content_name,
237 int component,
238 const std::string& ice_ufrag,
239 const std::string& ice_pwd)
240 : PortAllocatorSession(content_name, component,
241 ice_ufrag, ice_pwd, allocator->flags()),
242 allocator_(allocator), network_thread_(NULL),
243 socket_factory_(allocator->socket_factory()),
244 configuration_done_(false),
245 allocation_started_(false),
246 network_manager_started_(false),
247 running_(false),
248 allocation_sequences_created_(false) {
249 allocator_->network_manager()->SignalNetworksChanged.connect(
250 this, &BasicPortAllocatorSession::OnNetworksChanged);
251 allocator_->network_manager()->StartUpdating();
252}
253
254BasicPortAllocatorSession::~BasicPortAllocatorSession() {
255 allocator_->network_manager()->StopUpdating();
256 if (network_thread_ != NULL)
257 network_thread_->Clear(this);
258
259 std::vector<PortData>::iterator it;
260 for (it = ports_.begin(); it != ports_.end(); it++)
261 delete it->port();
262
263 for (uint32 i = 0; i < configs_.size(); ++i)
264 delete configs_[i];
265
266 for (uint32 i = 0; i < sequences_.size(); ++i)
267 delete sequences_[i];
268}
269
270void BasicPortAllocatorSession::StartGettingPorts() {
271 network_thread_ = talk_base::Thread::Current();
272 if (!socket_factory_) {
273 owned_socket_factory_.reset(
274 new talk_base::BasicPacketSocketFactory(network_thread_));
275 socket_factory_ = owned_socket_factory_.get();
276 }
277
278 running_ = true;
279 network_thread_->Post(this, MSG_CONFIG_START);
280
281 if (flags() & PORTALLOCATOR_ENABLE_SHAKER)
282 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE);
283}
284
285void BasicPortAllocatorSession::StopGettingPorts() {
286 ASSERT(talk_base::Thread::Current() == network_thread_);
287 running_ = false;
288 network_thread_->Clear(this, MSG_ALLOCATE);
289 for (uint32 i = 0; i < sequences_.size(); ++i)
290 sequences_[i]->Stop();
291 network_thread_->Post(this, MSG_CONFIG_STOP);
292}
293
294void BasicPortAllocatorSession::OnMessage(talk_base::Message *message) {
295 switch (message->message_id) {
296 case MSG_CONFIG_START:
297 ASSERT(talk_base::Thread::Current() == network_thread_);
298 GetPortConfigurations();
299 break;
300
301 case MSG_CONFIG_READY:
302 ASSERT(talk_base::Thread::Current() == network_thread_);
303 OnConfigReady(static_cast<PortConfiguration*>(message->pdata));
304 break;
305
306 case MSG_ALLOCATE:
307 ASSERT(talk_base::Thread::Current() == network_thread_);
308 OnAllocate();
309 break;
310
311 case MSG_SHAKE:
312 ASSERT(talk_base::Thread::Current() == network_thread_);
313 OnShake();
314 break;
315 case MSG_SEQUENCEOBJECTS_CREATED:
316 ASSERT(talk_base::Thread::Current() == network_thread_);
317 OnAllocationSequenceObjectsCreated();
318 break;
319 case MSG_CONFIG_STOP:
320 ASSERT(talk_base::Thread::Current() == network_thread_);
321 OnConfigStop();
322 break;
323 default:
324 ASSERT(false);
325 }
326}
327
328void BasicPortAllocatorSession::GetPortConfigurations() {
329 PortConfiguration* config = new PortConfiguration(allocator_->stun_address(),
330 username(),
331 password());
332
333 for (size_t i = 0; i < allocator_->relays().size(); ++i) {
334 config->AddRelay(allocator_->relays()[i]);
335 }
336 ConfigReady(config);
337}
338
339void BasicPortAllocatorSession::ConfigReady(PortConfiguration* config) {
340 network_thread_->Post(this, MSG_CONFIG_READY, config);
341}
342
343// Adds a configuration to the list.
344void BasicPortAllocatorSession::OnConfigReady(PortConfiguration* config) {
345 if (config)
346 configs_.push_back(config);
347
348 AllocatePorts();
349}
350
351void BasicPortAllocatorSession::OnConfigStop() {
352 ASSERT(talk_base::Thread::Current() == network_thread_);
353
354 // If any of the allocated ports have not completed the candidates allocation,
355 // mark those as error. Since session doesn't need any new candidates
356 // at this stage of the allocation, it's safe to discard any new candidates.
357 bool send_signal = false;
358 for (std::vector<PortData>::iterator it = ports_.begin();
359 it != ports_.end(); ++it) {
360 if (!it->complete()) {
361 // Updating port state to error, which didn't finish allocating candidates
362 // yet.
363 it->set_error();
364 send_signal = true;
365 }
366 }
367
368 // Did we stop any running sequences?
369 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin();
370 it != sequences_.end() && !send_signal; ++it) {
371 if ((*it)->state() == AllocationSequence::kStopped) {
372 send_signal = true;
373 }
374 }
375
376 // If we stopped anything that was running, send a done signal now.
377 if (send_signal) {
378 MaybeSignalCandidatesAllocationDone();
379 }
380}
381
382void BasicPortAllocatorSession::AllocatePorts() {
383 ASSERT(talk_base::Thread::Current() == network_thread_);
384 network_thread_->Post(this, MSG_ALLOCATE);
385}
386
387void BasicPortAllocatorSession::OnAllocate() {
388 if (network_manager_started_)
389 DoAllocate();
390
391 allocation_started_ = true;
392 if (running_)
393 network_thread_->PostDelayed(ALLOCATE_DELAY, this, MSG_ALLOCATE);
394}
395
396// For each network, see if we have a sequence that covers it already. If not,
397// create a new sequence to create the appropriate ports.
398void BasicPortAllocatorSession::DoAllocate() {
399 bool done_signal_needed = false;
400 std::vector<talk_base::Network*> networks;
401 allocator_->network_manager()->GetNetworks(&networks);
402 if (networks.empty()) {
403 LOG(LS_WARNING) << "Machine has no networks; no ports will be allocated";
404 done_signal_needed = true;
405 } else {
406 for (uint32 i = 0; i < networks.size(); ++i) {
407 PortConfiguration* config = NULL;
408 if (configs_.size() > 0)
409 config = configs_.back();
410
411 uint32 sequence_flags = flags();
412 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
413 // If all the ports are disabled we should just fire the allocation
414 // done event and return.
415 done_signal_needed = true;
416 break;
417 }
418
419 // Disables phases that are not specified in this config.
420 if (!config || config->stun_address.IsNil()) {
421 // No STUN ports specified in this config.
422 sequence_flags |= PORTALLOCATOR_DISABLE_STUN;
423 }
424 if (!config || config->relays.empty()) {
425 // No relay ports specified in this config.
426 sequence_flags |= PORTALLOCATOR_DISABLE_RELAY;
427 }
428
429 if (!(sequence_flags & PORTALLOCATOR_ENABLE_IPV6) &&
430 networks[i]->ip().family() == AF_INET6) {
431 // Skip IPv6 networks unless the flag's been set.
432 continue;
433 }
434
435 // Disable phases that would only create ports equivalent to
436 // ones that we have already made.
437 DisableEquivalentPhases(networks[i], config, &sequence_flags);
438
439 if ((sequence_flags & DISABLE_ALL_PHASES) == DISABLE_ALL_PHASES) {
440 // New AllocationSequence would have nothing to do, so don't make it.
441 continue;
442 }
443
444 AllocationSequence* sequence =
445 new AllocationSequence(this, networks[i], config, sequence_flags);
446 if (!sequence->Init()) {
447 delete sequence;
448 continue;
449 }
450 done_signal_needed = true;
451 sequence->SignalPortAllocationComplete.connect(
452 this, &BasicPortAllocatorSession::OnPortAllocationComplete);
453 if (running_)
454 sequence->Start();
455 sequences_.push_back(sequence);
456 }
457 }
458 if (done_signal_needed) {
459 network_thread_->Post(this, MSG_SEQUENCEOBJECTS_CREATED);
460 }
461}
462
463void BasicPortAllocatorSession::OnNetworksChanged() {
464 network_manager_started_ = true;
465 if (allocation_started_)
466 DoAllocate();
467}
468
469void BasicPortAllocatorSession::DisableEquivalentPhases(
470 talk_base::Network* network, PortConfiguration* config, uint32* flags) {
471 for (uint32 i = 0; i < sequences_.size() &&
472 (*flags & DISABLE_ALL_PHASES) != DISABLE_ALL_PHASES; ++i) {
473 sequences_[i]->DisableEquivalentPhases(network, config, flags);
474 }
475}
476
477void BasicPortAllocatorSession::AddAllocatedPort(Port* port,
478 AllocationSequence * seq,
479 bool prepare_address) {
480 if (!port)
481 return;
482
483 LOG(LS_INFO) << "Adding allocated port for " << content_name();
484 port->set_content_name(content_name());
485 port->set_component(component_);
486 port->set_generation(generation());
487 if (allocator_->proxy().type != talk_base::PROXY_NONE)
488 port->set_proxy(allocator_->user_agent(), allocator_->proxy());
489 port->set_send_retransmit_count_attribute((allocator_->flags() &
490 PORTALLOCATOR_ENABLE_STUN_RETRANSMIT_ATTRIBUTE) != 0);
491
492 if (content_name().compare(CN_VIDEO) == 0 &&
493 component_ == cricket::ICE_CANDIDATE_COMPONENT_RTP) {
494 // For video RTP alone, we set send-buffer sizes. This used to be set in the
495 // engines/channels.
496 int sendBufSize = (flags() & PORTALLOCATOR_USE_LARGE_SOCKET_SEND_BUFFERS)
497 ? kLargeSocketSendBufferSize
498 : kNormalSocketSendBufferSize;
499 port->SetOption(talk_base::Socket::OPT_SNDBUF, sendBufSize);
500 }
501
502 PortData data(port, seq);
503 ports_.push_back(data);
504
505 port->SignalCandidateReady.connect(
506 this, &BasicPortAllocatorSession::OnCandidateReady);
507 port->SignalPortComplete.connect(this,
508 &BasicPortAllocatorSession::OnPortComplete);
509 port->SignalDestroyed.connect(this,
510 &BasicPortAllocatorSession::OnPortDestroyed);
511 port->SignalPortError.connect(
512 this, &BasicPortAllocatorSession::OnPortError);
513 LOG_J(LS_INFO, port) << "Added port to allocator";
514
515 if (prepare_address)
516 port->PrepareAddress();
517 if (running_)
518 port->Start();
519}
520
521void BasicPortAllocatorSession::OnAllocationSequenceObjectsCreated() {
522 allocation_sequences_created_ = true;
523 // Send candidate allocation complete signal if we have no sequences.
524 MaybeSignalCandidatesAllocationDone();
525}
526
527void BasicPortAllocatorSession::OnCandidateReady(
528 Port* port, const Candidate& c) {
529 ASSERT(talk_base::Thread::Current() == network_thread_);
530 PortData* data = FindPort(port);
531 ASSERT(data != NULL);
532 // Discarding any candidate signal if port allocation status is
533 // already in completed state.
534 if (data->complete())
535 return;
536
537 // Send candidates whose protocol is enabled.
538 std::vector<Candidate> candidates;
539 ProtocolType pvalue;
540 if (StringToProto(c.protocol().c_str(), &pvalue) &&
541 data->sequence()->ProtocolEnabled(pvalue)) {
542 candidates.push_back(c);
543 }
544
545 if (!candidates.empty()) {
546 SignalCandidatesReady(this, candidates);
547 }
548
549 // Moving to READY state as we have atleast one candidate from the port.
550 // Since this port has atleast one candidate we should forward this port
551 // to listners, to allow connections from this port.
552 if (!data->ready()) {
553 data->set_ready();
554 SignalPortReady(this, port);
555 }
556}
557
558void BasicPortAllocatorSession::OnPortComplete(Port* port) {
559 ASSERT(talk_base::Thread::Current() == network_thread_);
560 PortData* data = FindPort(port);
561 ASSERT(data != NULL);
562
563 // Ignore any late signals.
564 if (data->complete())
565 return;
566
567 // Moving to COMPLETE state.
568 data->set_complete();
569 // Send candidate allocation complete signal if this was the last port.
570 MaybeSignalCandidatesAllocationDone();
571}
572
573void BasicPortAllocatorSession::OnPortError(Port* port) {
574 ASSERT(talk_base::Thread::Current() == network_thread_);
575 PortData* data = FindPort(port);
576 ASSERT(data != NULL);
577 // We might have already given up on this port and stopped it.
578 if (data->complete())
579 return;
580
581 // SignalAddressError is currently sent from StunPort/TurnPort.
582 // But this signal itself is generic.
583 data->set_error();
584 // Send candidate allocation complete signal if this was the last port.
585 MaybeSignalCandidatesAllocationDone();
586}
587
588void BasicPortAllocatorSession::OnProtocolEnabled(AllocationSequence* seq,
589 ProtocolType proto) {
590 std::vector<Candidate> candidates;
591 for (std::vector<PortData>::iterator it = ports_.begin();
592 it != ports_.end(); ++it) {
593 if (it->sequence() != seq)
594 continue;
595
596 const std::vector<Candidate>& potentials = it->port()->Candidates();
597 for (size_t i = 0; i < potentials.size(); ++i) {
598 ProtocolType pvalue;
599 if (!StringToProto(potentials[i].protocol().c_str(), &pvalue))
600 continue;
601 if (pvalue == proto) {
602 candidates.push_back(potentials[i]);
603 }
604 }
605 }
606
607 if (!candidates.empty()) {
608 SignalCandidatesReady(this, candidates);
609 }
610}
611
612void BasicPortAllocatorSession::OnPortAllocationComplete(
613 AllocationSequence* seq) {
614 // Send candidate allocation complete signal if all ports are done.
615 MaybeSignalCandidatesAllocationDone();
616}
617
618void BasicPortAllocatorSession::MaybeSignalCandidatesAllocationDone() {
619 // Send signal only if all required AllocationSequence objects
620 // are created.
621 if (!allocation_sequences_created_)
622 return;
623
624 // Check that all port allocation sequences are complete.
625 for (std::vector<AllocationSequence*>::iterator it = sequences_.begin();
626 it != sequences_.end(); ++it) {
627 if ((*it)->state() == AllocationSequence::kRunning)
628 return;
629 }
630
631 // If all allocated ports are in complete state, session must have got all
632 // expected candidates. Session will trigger candidates allocation complete
633 // signal.
634 for (std::vector<PortData>::iterator it = ports_.begin();
635 it != ports_.end(); ++it) {
636 if (!it->complete())
637 return;
638 }
639 LOG(LS_INFO) << "All candidates gathered for " << content_name_ << ":"
640 << component_ << ":" << generation();
641 SignalCandidatesAllocationDone(this);
642}
643
644void BasicPortAllocatorSession::OnPortDestroyed(
645 PortInterface* port) {
646 ASSERT(talk_base::Thread::Current() == network_thread_);
647 for (std::vector<PortData>::iterator iter = ports_.begin();
648 iter != ports_.end(); ++iter) {
649 if (port == iter->port()) {
650 ports_.erase(iter);
651 LOG_J(LS_INFO, port) << "Removed port from allocator ("
652 << static_cast<int>(ports_.size()) << " remaining)";
653 return;
654 }
655 }
656 ASSERT(false);
657}
658
659void BasicPortAllocatorSession::OnShake() {
660 LOG(INFO) << ">>>>> SHAKE <<<<< >>>>> SHAKE <<<<< >>>>> SHAKE <<<<<";
661
662 std::vector<Port*> ports;
663 std::vector<Connection*> connections;
664
665 for (size_t i = 0; i < ports_.size(); ++i) {
666 if (ports_[i].ready())
667 ports.push_back(ports_[i].port());
668 }
669
670 for (size_t i = 0; i < ports.size(); ++i) {
671 Port::AddressMap::const_iterator iter;
672 for (iter = ports[i]->connections().begin();
673 iter != ports[i]->connections().end();
674 ++iter) {
675 connections.push_back(iter->second);
676 }
677 }
678
679 LOG(INFO) << ">>>>> Destroying " << ports.size() << " ports and "
680 << connections.size() << " connections";
681
682 for (size_t i = 0; i < connections.size(); ++i)
683 connections[i]->Destroy();
684
685 if (running_ || (ports.size() > 0) || (connections.size() > 0))
686 network_thread_->PostDelayed(ShakeDelay(), this, MSG_SHAKE);
687}
688
689BasicPortAllocatorSession::PortData* BasicPortAllocatorSession::FindPort(
690 Port* port) {
691 for (std::vector<PortData>::iterator it = ports_.begin();
692 it != ports_.end(); ++it) {
693 if (it->port() == port) {
694 return &*it;
695 }
696 }
697 return NULL;
698}
699
700// AllocationSequence
701
702AllocationSequence::AllocationSequence(BasicPortAllocatorSession* session,
703 talk_base::Network* network,
704 PortConfiguration* config,
705 uint32 flags)
706 : session_(session),
707 network_(network),
708 ip_(network->ip()),
709 config_(config),
710 state_(kInit),
711 flags_(flags),
712 udp_socket_(NULL),
713 phase_(0) {
714}
715
716bool AllocationSequence::Init() {
717 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) &&
718 !IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_UFRAG)) {
719 LOG(LS_ERROR) << "Shared socket option can't be set without "
720 << "shared ufrag.";
721 ASSERT(false);
722 return false;
723 }
724
725 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
726 udp_socket_.reset(session_->socket_factory()->CreateUdpSocket(
727 talk_base::SocketAddress(ip_, 0), session_->allocator()->min_port(),
728 session_->allocator()->max_port()));
729 if (udp_socket_) {
730 udp_socket_->SignalReadPacket.connect(
731 this, &AllocationSequence::OnReadPacket);
732 }
733 // Continuing if |udp_socket_| is NULL, as local TCP and RelayPort using TCP
734 // are next available options to setup a communication channel.
735 }
736 return true;
737}
738
739AllocationSequence::~AllocationSequence() {
740 session_->network_thread()->Clear(this);
741}
742
743void AllocationSequence::DisableEquivalentPhases(talk_base::Network* network,
744 PortConfiguration* config, uint32* flags) {
745 if (!((network == network_) && (ip_ == network->ip()))) {
746 // Different network setup; nothing is equivalent.
747 return;
748 }
749
750 // Else turn off the stuff that we've already got covered.
751
752 // Every config implicitly specifies local, so turn that off right away.
753 *flags |= PORTALLOCATOR_DISABLE_UDP;
754 *flags |= PORTALLOCATOR_DISABLE_TCP;
755
756 if (config_ && config) {
757 if (config_->stun_address == config->stun_address) {
758 // Already got this STUN server covered.
759 *flags |= PORTALLOCATOR_DISABLE_STUN;
760 }
761 if (!config_->relays.empty()) {
762 // Already got relays covered.
763 // NOTE: This will even skip a _different_ set of relay servers if we
764 // were to be given one, but that never happens in our codebase. Should
765 // probably get rid of the list in PortConfiguration and just keep a
766 // single relay server in each one.
767 *flags |= PORTALLOCATOR_DISABLE_RELAY;
768 }
769 }
770}
771
772void AllocationSequence::Start() {
773 state_ = kRunning;
774 session_->network_thread()->Post(this, MSG_ALLOCATION_PHASE);
775}
776
777void AllocationSequence::Stop() {
778 // If the port is completed, don't set it to stopped.
779 if (state_ == kRunning) {
780 state_ = kStopped;
781 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
782 }
783}
784
785void AllocationSequence::OnMessage(talk_base::Message* msg) {
786 ASSERT(talk_base::Thread::Current() == session_->network_thread());
787 ASSERT(msg->message_id == MSG_ALLOCATION_PHASE);
788
789 const char* const PHASE_NAMES[kNumPhases] = {
790 "Udp", "Relay", "Tcp", "SslTcp"
791 };
792
793 // Perform all of the phases in the current step.
794 LOG_J(LS_INFO, network_) << "Allocation Phase="
795 << PHASE_NAMES[phase_];
796
797 switch (phase_) {
798 case PHASE_UDP:
799 CreateUDPPorts();
800 CreateStunPorts();
801 EnableProtocol(PROTO_UDP);
802 break;
803
804 case PHASE_RELAY:
805 CreateRelayPorts();
806 break;
807
808 case PHASE_TCP:
809 CreateTCPPorts();
810 EnableProtocol(PROTO_TCP);
811 break;
812
813 case PHASE_SSLTCP:
814 state_ = kCompleted;
815 EnableProtocol(PROTO_SSLTCP);
816 break;
817
818 default:
819 ASSERT(false);
820 }
821
822 if (state() == kRunning) {
823 ++phase_;
824 session_->network_thread()->PostDelayed(
825 session_->allocator()->step_delay(),
826 this, MSG_ALLOCATION_PHASE);
827 } else {
828 // If all phases in AllocationSequence are completed, no allocation
829 // steps needed further. Canceling pending signal.
830 session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
831 SignalPortAllocationComplete(this);
832 }
833}
834
835void AllocationSequence::EnableProtocol(ProtocolType proto) {
836 if (!ProtocolEnabled(proto)) {
837 protocols_.push_back(proto);
838 session_->OnProtocolEnabled(this, proto);
839 }
840}
841
842bool AllocationSequence::ProtocolEnabled(ProtocolType proto) const {
843 for (ProtocolList::const_iterator it = protocols_.begin();
844 it != protocols_.end(); ++it) {
845 if (*it == proto)
846 return true;
847 }
848 return false;
849}
850
851void AllocationSequence::CreateUDPPorts() {
852 if (IsFlagSet(PORTALLOCATOR_DISABLE_UDP)) {
853 LOG(LS_VERBOSE) << "AllocationSequence: UDP ports disabled, skipping.";
854 return;
855 }
856
857 // TODO(mallinath) - Remove UDPPort creating socket after shared socket
858 // is enabled completely.
859 UDPPort* port = NULL;
860 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) && udp_socket_) {
861 port = UDPPort::Create(session_->network_thread(), network_,
862 udp_socket_.get(),
863 session_->username(), session_->password());
864 } else {
865 port = UDPPort::Create(session_->network_thread(),
866 session_->socket_factory(),
867 network_, ip_,
868 session_->allocator()->min_port(),
869 session_->allocator()->max_port(),
870 session_->username(), session_->password());
871 }
872
873 if (port) {
874 ports.push_back(port);
875 // If shared socket is enabled, STUN candidate will be allocated by the
876 // UDPPort.
877 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET) &&
878 !IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
879 ASSERT(config_ && !config_->stun_address.IsNil());
880 if (!(config_ && !config_->stun_address.IsNil())) {
881 LOG(LS_WARNING)
882 << "AllocationSequence: No STUN server configured, skipping.";
883 return;
884 }
885 port->set_server_addr(config_->stun_address);
886 }
887
888 session_->AddAllocatedPort(port, this, true);
889 port->SignalDestroyed.connect(this, &AllocationSequence::OnPortDestroyed);
890 }
891}
892
893void AllocationSequence::CreateTCPPorts() {
894 if (IsFlagSet(PORTALLOCATOR_DISABLE_TCP)) {
895 LOG(LS_VERBOSE) << "AllocationSequence: TCP ports disabled, skipping.";
896 return;
897 }
898
899 Port* port = TCPPort::Create(session_->network_thread(),
900 session_->socket_factory(),
901 network_, ip_,
902 session_->allocator()->min_port(),
903 session_->allocator()->max_port(),
904 session_->username(), session_->password(),
905 session_->allocator()->allow_tcp_listen());
906 if (port) {
907 session_->AddAllocatedPort(port, this, true);
908 // Since TCPPort is not created using shared socket, |port| will not be
909 // added to the dequeue.
910 }
911}
912
913void AllocationSequence::CreateStunPorts() {
914 if (IsFlagSet(PORTALLOCATOR_DISABLE_STUN)) {
915 LOG(LS_VERBOSE) << "AllocationSequence: STUN ports disabled, skipping.";
916 return;
917 }
918
919 if (IsFlagSet(PORTALLOCATOR_ENABLE_SHARED_SOCKET)) {
920 LOG(LS_INFO) << "AllocationSequence: "
921 << "UDPPort will be handling the STUN candidate generation.";
922 return;
923 }
924
925 // If BasicPortAllocatorSession::OnAllocate left STUN ports enabled then we
926 // ought to have an address for them here.
927 ASSERT(config_ && !config_->stun_address.IsNil());
928 if (!(config_ && !config_->stun_address.IsNil())) {
929 LOG(LS_WARNING)
930 << "AllocationSequence: No STUN server configured, skipping.";
931 return;
932 }
933
934 StunPort* port = StunPort::Create(session_->network_thread(),
935 session_->socket_factory(),
936 network_, ip_,
937 session_->allocator()->min_port(),
938 session_->allocator()->max_port(),
939 session_->username(), session_->password(),
940 config_->stun_address);
941 if (port) {
942 session_->AddAllocatedPort(port, this, true);
943 // Since StunPort is not created using shared socket, |port| will not be
944 // added to the dequeue.
945 }
946}
947
948void AllocationSequence::CreateRelayPorts() {
949 if (IsFlagSet(PORTALLOCATOR_DISABLE_RELAY)) {
950 LOG(LS_VERBOSE) << "AllocationSequence: Relay ports disabled, skipping.";
951 return;
952 }
953
954 // If BasicPortAllocatorSession::OnAllocate left relay ports enabled then we
955 // ought to have a relay list for them here.
956 ASSERT(config_ && !config_->relays.empty());
957 if (!(config_ && !config_->relays.empty())) {
958 LOG(LS_WARNING)
959 << "AllocationSequence: No relay server configured, skipping.";
960 return;
961 }
962
963 PortConfiguration::RelayList::const_iterator relay;
964 for (relay = config_->relays.begin();
965 relay != config_->relays.end(); ++relay) {
966 if (relay->type == RELAY_GTURN) {
967 CreateGturnPort(*relay);
968 } else if (relay->type == RELAY_TURN) {
969 CreateTurnPort(*relay);
970 } else {
971 ASSERT(false);
972 }
973 }
974}
975
976void AllocationSequence::CreateGturnPort(const RelayServerConfig& config) {
977 // TODO(mallinath) - Rename RelayPort to GTurnPort.
978 RelayPort* port = RelayPort::Create(session_->network_thread(),
979 session_->socket_factory(),
980 network_, ip_,
981 session_->allocator()->min_port(),
982 session_->allocator()->max_port(),
983 config_->username, config_->password);
984 if (port) {
985 // Since RelayPort is not created using shared socket, |port| will not be
986 // added to the dequeue.
987 // Note: We must add the allocated port before we add addresses because
988 // the latter will create candidates that need name and preference
989 // settings. However, we also can't prepare the address (normally
990 // done by AddAllocatedPort) until we have these addresses. So we
991 // wait to do that until below.
992 session_->AddAllocatedPort(port, this, false);
993
994 // Add the addresses of this protocol.
995 PortList::const_iterator relay_port;
996 for (relay_port = config.ports.begin();
997 relay_port != config.ports.end();
998 ++relay_port) {
999 port->AddServerAddress(*relay_port);
1000 port->AddExternalAddress(*relay_port);
1001 }
1002 // Start fetching an address for this port.
1003 port->PrepareAddress();
1004 }
1005}
1006
1007void AllocationSequence::CreateTurnPort(const RelayServerConfig& config) {
1008 PortList::const_iterator relay_port;
1009 for (relay_port = config.ports.begin();
1010 relay_port != config.ports.end(); ++relay_port) {
1011 TurnPort* port = TurnPort::Create(session_->network_thread(),
1012 session_->socket_factory(),
1013 network_, ip_,
1014 session_->allocator()->min_port(),
1015 session_->allocator()->max_port(),
1016 session_->username(),
1017 session_->password(),
1018 *relay_port, config.credentials);
1019 if (port) {
1020 session_->AddAllocatedPort(port, this, true);
1021 }
1022 }
1023}
1024
1025void AllocationSequence::OnReadPacket(
1026 talk_base::AsyncPacketSocket* socket, const char* data, size_t size,
1027 const talk_base::SocketAddress& remote_addr) {
1028 ASSERT(socket == udp_socket_.get());
1029 for (std::deque<Port*>::iterator iter = ports.begin();
1030 iter != ports.end(); ++iter) {
1031 // We have only one port in the queue.
1032 // TODO(mallinath) - Add shared socket support to Relay and Turn ports.
1033 if ((*iter)->HandleIncomingPacket(socket, data, size, remote_addr)) {
1034 break;
1035 }
1036 }
1037}
1038
1039void AllocationSequence::OnPortDestroyed(PortInterface* port) {
1040 std::deque<Port*>::iterator iter =
1041 std::find(ports.begin(), ports.end(), port);
1042 ASSERT(iter != ports.end());
1043 ports.erase(iter);
1044}
1045
1046// PortConfiguration
1047PortConfiguration::PortConfiguration(
1048 const talk_base::SocketAddress& stun_address,
1049 const std::string& username,
1050 const std::string& password)
1051 : stun_address(stun_address),
1052 username(username),
1053 password(password) {
1054}
1055
1056void PortConfiguration::AddRelay(const RelayServerConfig& config) {
1057 relays.push_back(config);
1058}
1059
1060bool PortConfiguration::SupportsProtocol(
1061 const RelayServerConfig& relay, ProtocolType type) {
1062 PortList::const_iterator relay_port;
1063 for (relay_port = relay.ports.begin();
1064 relay_port != relay.ports.end();
1065 ++relay_port) {
1066 if (relay_port->proto == type)
1067 return true;
1068 }
1069 return false;
1070}
1071
1072} // namespace cricket