blob: 9749b14ef7e024a96ae5f24e201e98514b01a570 [file] [log] [blame]
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/p2p/base/session.h"
12
13#include "webrtc/p2p/base/dtlstransport.h"
14#include "webrtc/p2p/base/p2ptransport.h"
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +000015#include "webrtc/p2p/base/sessionclient.h"
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +000016#include "webrtc/p2p/base/transport.h"
17#include "webrtc/p2p/base/transportchannelproxy.h"
18#include "webrtc/p2p/base/transportinfo.h"
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +000019#include "webrtc/libjingle/xmpp/constants.h"
20#include "webrtc/libjingle/xmpp/jid.h"
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +000021#include "webrtc/base/bind.h"
22#include "webrtc/base/common.h"
23#include "webrtc/base/helpers.h"
24#include "webrtc/base/logging.h"
25#include "webrtc/base/scoped_ptr.h"
26#include "webrtc/base/sslstreamadapter.h"
27
28#include "webrtc/p2p/base/constants.h"
29
30namespace cricket {
31
32using rtc::Bind;
33
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +000034bool BadMessage(const buzz::QName type,
35 const std::string& text,
36 MessageError* err) {
37 err->SetType(type);
38 err->SetText(text);
39 return false;
40}
41
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +000042TransportProxy::~TransportProxy() {
43 for (ChannelMap::iterator iter = channels_.begin();
44 iter != channels_.end(); ++iter) {
45 iter->second->SignalDestroyed(iter->second);
46 delete iter->second;
47 }
48}
49
50const std::string& TransportProxy::type() const {
51 return transport_->get()->type();
52}
53
54TransportChannel* TransportProxy::GetChannel(int component) {
55 ASSERT(rtc::Thread::Current() == worker_thread_);
56 return GetChannelProxy(component);
57}
58
59TransportChannel* TransportProxy::CreateChannel(
60 const std::string& name, int component) {
61 ASSERT(rtc::Thread::Current() == worker_thread_);
62 ASSERT(GetChannel(component) == NULL);
63 ASSERT(!transport_->get()->HasChannel(component));
64
65 // We always create a proxy in case we need to change out the transport later.
66 TransportChannelProxy* channel =
67 new TransportChannelProxy(content_name(), name, component);
68 channels_[component] = channel;
69
70 // If we're already negotiated, create an impl and hook it up to the proxy
71 // channel. If we're connecting, create an impl but don't hook it up yet.
72 if (negotiated_) {
73 SetupChannelProxy_w(component, channel);
74 } else if (connecting_) {
75 GetOrCreateChannelProxyImpl_w(component);
76 }
77 return channel;
78}
79
80bool TransportProxy::HasChannel(int component) {
81 return transport_->get()->HasChannel(component);
82}
83
84void TransportProxy::DestroyChannel(int component) {
85 ASSERT(rtc::Thread::Current() == worker_thread_);
86 TransportChannel* channel = GetChannel(component);
87 if (channel) {
88 // If the state of TransportProxy is not NEGOTIATED
89 // then TransportChannelProxy and its impl are not
90 // connected. Both must be connected before
91 // deletion.
92 if (!negotiated_) {
93 SetupChannelProxy_w(component, GetChannelProxy(component));
94 }
95
96 channels_.erase(component);
97 channel->SignalDestroyed(channel);
98 delete channel;
99 }
100}
101
102void TransportProxy::ConnectChannels() {
103 if (!connecting_) {
104 if (!negotiated_) {
105 for (ChannelMap::iterator iter = channels_.begin();
106 iter != channels_.end(); ++iter) {
107 GetOrCreateChannelProxyImpl(iter->first);
108 }
109 }
110 connecting_ = true;
111 }
112 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
113 // don't have any channels yet, so we need to allow this method to be called
114 // multiple times. Once we fix Transport, we can move this call inside the
115 // if (!connecting_) block.
116 transport_->get()->ConnectChannels();
117}
118
119void TransportProxy::CompleteNegotiation() {
120 if (!negotiated_) {
121 for (ChannelMap::iterator iter = channels_.begin();
122 iter != channels_.end(); ++iter) {
123 SetupChannelProxy(iter->first, iter->second);
124 }
125 negotiated_ = true;
126 }
127}
128
129void TransportProxy::AddSentCandidates(const Candidates& candidates) {
130 for (Candidates::const_iterator cand = candidates.begin();
131 cand != candidates.end(); ++cand) {
132 sent_candidates_.push_back(*cand);
133 }
134}
135
136void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
137 for (Candidates::const_iterator cand = candidates.begin();
138 cand != candidates.end(); ++cand) {
139 unsent_candidates_.push_back(*cand);
140 }
141}
142
143bool TransportProxy::GetChannelNameFromComponent(
144 int component, std::string* channel_name) const {
145 const TransportChannelProxy* channel = GetChannelProxy(component);
146 if (channel == NULL) {
147 return false;
148 }
149
150 *channel_name = channel->name();
151 return true;
152}
153
154bool TransportProxy::GetComponentFromChannelName(
155 const std::string& channel_name, int* component) const {
156 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
157 if (channel == NULL) {
158 return false;
159 }
160
161 *component = channel->component();
162 return true;
163}
164
165TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
166 ChannelMap::const_iterator iter = channels_.find(component);
167 return (iter != channels_.end()) ? iter->second : NULL;
168}
169
170TransportChannelProxy* TransportProxy::GetChannelProxyByName(
171 const std::string& name) const {
172 for (ChannelMap::const_iterator iter = channels_.begin();
173 iter != channels_.end();
174 ++iter) {
175 if (iter->second->name() == name) {
176 return iter->second;
177 }
178 }
179 return NULL;
180}
181
182TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
183 int component) {
184 return worker_thread_->Invoke<TransportChannelImpl*>(Bind(
185 &TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
186}
187
188TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w(
189 int component) {
190 ASSERT(rtc::Thread::Current() == worker_thread_);
191 TransportChannelImpl* impl = transport_->get()->GetChannel(component);
192 if (impl == NULL) {
193 impl = transport_->get()->CreateChannel(component);
194 }
195 return impl;
196}
197
198void TransportProxy::SetupChannelProxy(
199 int component, TransportChannelProxy* transproxy) {
200 worker_thread_->Invoke<void>(Bind(
201 &TransportProxy::SetupChannelProxy_w, this, component, transproxy));
202}
203
204void TransportProxy::SetupChannelProxy_w(
205 int component, TransportChannelProxy* transproxy) {
206 ASSERT(rtc::Thread::Current() == worker_thread_);
207 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
208 ASSERT(impl != NULL);
209 transproxy->SetImplementation(impl);
210}
211
212void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
213 TransportChannelImpl* impl) {
214 worker_thread_->Invoke<void>(Bind(
215 &TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl));
216}
217
218void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy,
219 TransportChannelImpl* impl) {
220 ASSERT(rtc::Thread::Current() == worker_thread_);
221 ASSERT(proxy != NULL);
222 proxy->SetImplementation(impl);
223}
224
225// This function muxes |this| onto |target| by repointing |this| at
226// |target|'s transport and setting our TransportChannelProxies
227// to point to |target|'s underlying implementations.
228bool TransportProxy::SetupMux(TransportProxy* target) {
229 // Bail out if there's nothing to do.
230 if (transport_ == target->transport_) {
231 return true;
232 }
233
234 // Run through all channels and remove any non-rtp transport channels before
235 // setting target transport channels.
236 for (ChannelMap::const_iterator iter = channels_.begin();
237 iter != channels_.end(); ++iter) {
238 if (!target->transport_->get()->HasChannel(iter->first)) {
239 // Remove if channel doesn't exist in |transport_|.
240 ReplaceChannelProxyImpl(iter->second, NULL);
241 } else {
242 // Replace the impl for all the TransportProxyChannels with the channels
243 // from |target|'s transport. Fail if there's not an exact match.
244 ReplaceChannelProxyImpl(
245 iter->second, target->transport_->get()->CreateChannel(iter->first));
246 }
247 }
248
249 // Now replace our transport. Must happen afterwards because
250 // it deletes all impls as a side effect.
251 transport_ = target->transport_;
252 transport_->get()->SignalCandidatesReady.connect(
253 this, &TransportProxy::OnTransportCandidatesReady);
254 set_candidates_allocated(target->candidates_allocated());
255 return true;
256}
257
258void TransportProxy::SetIceRole(IceRole role) {
259 transport_->get()->SetIceRole(role);
260}
261
262bool TransportProxy::SetLocalTransportDescription(
263 const TransportDescription& description,
264 ContentAction action,
265 std::string* error_desc) {
266 // If this is an answer, finalize the negotiation.
267 if (action == CA_ANSWER) {
268 CompleteNegotiation();
269 }
270 bool result = transport_->get()->SetLocalTransportDescription(description,
271 action,
272 error_desc);
273 if (result)
274 local_description_set_ = true;
275 return result;
276}
277
278bool TransportProxy::SetRemoteTransportDescription(
279 const TransportDescription& description,
280 ContentAction action,
281 std::string* error_desc) {
282 // If this is an answer, finalize the negotiation.
283 if (action == CA_ANSWER) {
284 CompleteNegotiation();
285 }
286 bool result = transport_->get()->SetRemoteTransportDescription(description,
287 action,
288 error_desc);
289 if (result)
290 remote_description_set_ = true;
291 return result;
292}
293
294void TransportProxy::OnSignalingReady() {
295 // If we're starting a new allocation sequence, reset our state.
296 set_candidates_allocated(false);
297 transport_->get()->OnSignalingReady();
298}
299
300bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
301 std::string* error) {
302 // Ensure the transport is negotiated before handling candidates.
303 // TODO(juberti): Remove this once everybody calls SetLocalTD.
304 CompleteNegotiation();
305
306 // Verify each candidate before passing down to transport layer.
307 for (Candidates::const_iterator cand = candidates.begin();
308 cand != candidates.end(); ++cand) {
309 if (!transport_->get()->VerifyCandidate(*cand, error))
310 return false;
311 if (!HasChannel(cand->component())) {
312 *error = "Candidate has unknown component: " + cand->ToString() +
313 " for content: " + content_name_;
314 return false;
315 }
316 }
317 transport_->get()->OnRemoteCandidates(candidates);
318 return true;
319}
320
321void TransportProxy::SetIdentity(
322 rtc::SSLIdentity* identity) {
323 transport_->get()->SetIdentity(identity);
324}
325
326std::string BaseSession::StateToString(State state) {
327 switch (state) {
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000328 case Session::STATE_INIT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000329 return "STATE_INIT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000330 case Session::STATE_SENTINITIATE:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000331 return "STATE_SENTINITIATE";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000332 case Session::STATE_RECEIVEDINITIATE:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000333 return "STATE_RECEIVEDINITIATE";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000334 case Session::STATE_SENTPRACCEPT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000335 return "STATE_SENTPRACCEPT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000336 case Session::STATE_SENTACCEPT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000337 return "STATE_SENTACCEPT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000338 case Session::STATE_RECEIVEDPRACCEPT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000339 return "STATE_RECEIVEDPRACCEPT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000340 case Session::STATE_RECEIVEDACCEPT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000341 return "STATE_RECEIVEDACCEPT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000342 case Session::STATE_SENTMODIFY:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000343 return "STATE_SENTMODIFY";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000344 case Session::STATE_RECEIVEDMODIFY:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000345 return "STATE_RECEIVEDMODIFY";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000346 case Session::STATE_SENTREJECT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000347 return "STATE_SENTREJECT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000348 case Session::STATE_RECEIVEDREJECT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000349 return "STATE_RECEIVEDREJECT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000350 case Session::STATE_SENTREDIRECT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000351 return "STATE_SENTREDIRECT";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000352 case Session::STATE_SENTTERMINATE:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000353 return "STATE_SENTTERMINATE";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000354 case Session::STATE_RECEIVEDTERMINATE:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000355 return "STATE_RECEIVEDTERMINATE";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000356 case Session::STATE_INPROGRESS:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000357 return "STATE_INPROGRESS";
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000358 case Session::STATE_DEINIT:
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000359 return "STATE_DEINIT";
360 default:
361 break;
362 }
363 return "STATE_" + rtc::ToString(state);
364}
365
366BaseSession::BaseSession(rtc::Thread* signaling_thread,
367 rtc::Thread* worker_thread,
368 PortAllocator* port_allocator,
369 const std::string& sid,
370 const std::string& content_type,
371 bool initiator)
372 : state_(STATE_INIT),
373 error_(ERROR_NONE),
374 signaling_thread_(signaling_thread),
375 worker_thread_(worker_thread),
376 port_allocator_(port_allocator),
377 sid_(sid),
378 content_type_(content_type),
379 transport_type_(NS_GINGLE_P2P),
380 initiator_(initiator),
381 identity_(NULL),
382 ice_tiebreaker_(rtc::CreateRandomId64()),
383 role_switch_(false) {
384 ASSERT(signaling_thread->IsCurrent());
385}
386
387BaseSession::~BaseSession() {
388 ASSERT(signaling_thread()->IsCurrent());
389
390 ASSERT(state_ != STATE_DEINIT);
391 LogState(state_, STATE_DEINIT);
392 state_ = STATE_DEINIT;
393 SignalState(this, state_);
394
395 for (TransportMap::iterator iter = transports_.begin();
396 iter != transports_.end(); ++iter) {
397 delete iter->second;
398 }
399}
400
401const SessionDescription* BaseSession::local_description() const {
402 // TODO(tommi): Assert on thread correctness.
403 return local_description_.get();
404}
405
406const SessionDescription* BaseSession::remote_description() const {
407 // TODO(tommi): Assert on thread correctness.
408 return remote_description_.get();
409}
410
411SessionDescription* BaseSession::remote_description() {
412 // TODO(tommi): Assert on thread correctness.
413 return remote_description_.get();
414}
415
416void BaseSession::set_local_description(const SessionDescription* sdesc) {
417 // TODO(tommi): Assert on thread correctness.
418 if (sdesc != local_description_.get())
419 local_description_.reset(sdesc);
420}
421
422void BaseSession::set_remote_description(SessionDescription* sdesc) {
423 // TODO(tommi): Assert on thread correctness.
424 if (sdesc != remote_description_)
425 remote_description_.reset(sdesc);
426}
427
428const SessionDescription* BaseSession::initiator_description() const {
429 // TODO(tommi): Assert on thread correctness.
430 return initiator_ ? local_description_.get() : remote_description_.get();
431}
432
433bool BaseSession::SetIdentity(rtc::SSLIdentity* identity) {
434 if (identity_)
435 return false;
436 identity_ = identity;
437 for (TransportMap::iterator iter = transports_.begin();
438 iter != transports_.end(); ++iter) {
439 iter->second->SetIdentity(identity_);
440 }
441 return true;
442}
443
444bool BaseSession::PushdownTransportDescription(ContentSource source,
445 ContentAction action,
446 std::string* error_desc) {
447 if (source == CS_LOCAL) {
448 return PushdownLocalTransportDescription(local_description(),
449 action,
450 error_desc);
451 }
452 return PushdownRemoteTransportDescription(remote_description(),
453 action,
454 error_desc);
455}
456
457bool BaseSession::PushdownLocalTransportDescription(
458 const SessionDescription* sdesc,
459 ContentAction action,
460 std::string* error_desc) {
461 // Update the Transports with the right information, and trigger them to
462 // start connecting.
463 for (TransportMap::iterator iter = transports_.begin();
464 iter != transports_.end(); ++iter) {
465 // If no transport info was in this session description, ret == false
466 // and we just skip this one.
467 TransportDescription tdesc;
468 bool ret = GetTransportDescription(
469 sdesc, iter->second->content_name(), &tdesc);
470 if (ret) {
471 if (!iter->second->SetLocalTransportDescription(tdesc, action,
472 error_desc)) {
473 return false;
474 }
475
476 iter->second->ConnectChannels();
477 }
478 }
479
480 return true;
481}
482
483bool BaseSession::PushdownRemoteTransportDescription(
484 const SessionDescription* sdesc,
485 ContentAction action,
486 std::string* error_desc) {
487 // Update the Transports with the right information.
488 for (TransportMap::iterator iter = transports_.begin();
489 iter != transports_.end(); ++iter) {
490 TransportDescription tdesc;
491
492 // If no transport info was in this session description, ret == false
493 // and we just skip this one.
494 bool ret = GetTransportDescription(
495 sdesc, iter->second->content_name(), &tdesc);
496 if (ret) {
497 if (!iter->second->SetRemoteTransportDescription(tdesc, action,
498 error_desc)) {
499 return false;
500 }
501 }
502 }
503
504 return true;
505}
506
507TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
508 const std::string& channel_name,
509 int component) {
510 // We create the proxy "on demand" here because we need to support
511 // creating channels at any time, even before we send or receive
512 // initiate messages, which is before we create the transports.
513 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
514 return transproxy->CreateChannel(channel_name, component);
515}
516
517TransportChannel* BaseSession::GetChannel(const std::string& content_name,
518 int component) {
519 TransportProxy* transproxy = GetTransportProxy(content_name);
520 if (transproxy == NULL)
521 return NULL;
522
523 return transproxy->GetChannel(component);
524}
525
526void BaseSession::DestroyChannel(const std::string& content_name,
527 int component) {
528 TransportProxy* transproxy = GetTransportProxy(content_name);
529 ASSERT(transproxy != NULL);
530 transproxy->DestroyChannel(component);
531}
532
533TransportProxy* BaseSession::GetOrCreateTransportProxy(
534 const std::string& content_name) {
535 TransportProxy* transproxy = GetTransportProxy(content_name);
536 if (transproxy)
537 return transproxy;
538
539 Transport* transport = CreateTransport(content_name);
540 transport->SetIceRole(initiator_ ? ICEROLE_CONTROLLING : ICEROLE_CONTROLLED);
541 transport->SetIceTiebreaker(ice_tiebreaker_);
542 // TODO: Connect all the Transport signals to TransportProxy
543 // then to the BaseSession.
544 transport->SignalConnecting.connect(
545 this, &BaseSession::OnTransportConnecting);
546 transport->SignalWritableState.connect(
547 this, &BaseSession::OnTransportWritable);
548 transport->SignalRequestSignaling.connect(
549 this, &BaseSession::OnTransportRequestSignaling);
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000550 transport->SignalTransportError.connect(
551 this, &BaseSession::OnTransportSendError);
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +0000552 transport->SignalRouteChange.connect(
553 this, &BaseSession::OnTransportRouteChange);
554 transport->SignalCandidatesAllocationDone.connect(
555 this, &BaseSession::OnTransportCandidatesAllocationDone);
556 transport->SignalRoleConflict.connect(
557 this, &BaseSession::OnRoleConflict);
558 transport->SignalCompleted.connect(
559 this, &BaseSession::OnTransportCompleted);
560 transport->SignalFailed.connect(
561 this, &BaseSession::OnTransportFailed);
562
563 transproxy = new TransportProxy(worker_thread_, sid_, content_name,
564 new TransportWrapper(transport));
565 transproxy->SignalCandidatesReady.connect(
566 this, &BaseSession::OnTransportProxyCandidatesReady);
567 if (identity_)
568 transproxy->SetIdentity(identity_);
569 transports_[content_name] = transproxy;
570
571 return transproxy;
572}
573
574Transport* BaseSession::GetTransport(const std::string& content_name) {
575 TransportProxy* transproxy = GetTransportProxy(content_name);
576 if (transproxy == NULL)
577 return NULL;
578 return transproxy->impl();
579}
580
581TransportProxy* BaseSession::GetTransportProxy(
582 const std::string& content_name) {
583 TransportMap::iterator iter = transports_.find(content_name);
584 return (iter != transports_.end()) ? iter->second : NULL;
585}
586
587TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
588 for (TransportMap::iterator iter = transports_.begin();
589 iter != transports_.end(); ++iter) {
590 TransportProxy* transproxy = iter->second;
591 if (transproxy->impl() == transport) {
592 return transproxy;
593 }
594 }
595 return NULL;
596}
597
598TransportProxy* BaseSession::GetFirstTransportProxy() {
599 if (transports_.empty())
600 return NULL;
601 return transports_.begin()->second;
602}
603
604void BaseSession::DestroyTransportProxy(
605 const std::string& content_name) {
606 TransportMap::iterator iter = transports_.find(content_name);
607 if (iter != transports_.end()) {
608 delete iter->second;
609 transports_.erase(content_name);
610 }
611}
612
613cricket::Transport* BaseSession::CreateTransport(
614 const std::string& content_name) {
615 ASSERT(transport_type_ == NS_GINGLE_P2P);
616 return new cricket::DtlsTransport<P2PTransport>(
617 signaling_thread(), worker_thread(), content_name,
618 port_allocator(), identity_);
619}
620
621bool BaseSession::GetStats(SessionStats* stats) {
622 for (TransportMap::iterator iter = transports_.begin();
623 iter != transports_.end(); ++iter) {
624 std::string proxy_id = iter->second->content_name();
625 // We are ignoring not-yet-instantiated transports.
626 if (iter->second->impl()) {
627 std::string transport_id = iter->second->impl()->content_name();
628 stats->proxy_to_transport[proxy_id] = transport_id;
629 if (stats->transport_stats.find(transport_id)
630 == stats->transport_stats.end()) {
631 TransportStats subinfos;
632 if (!iter->second->impl()->GetStats(&subinfos)) {
633 return false;
634 }
635 stats->transport_stats[transport_id] = subinfos;
636 }
637 }
638 }
639 return true;
640}
641
642void BaseSession::SetState(State state) {
643 ASSERT(signaling_thread_->IsCurrent());
644 if (state != state_) {
645 LogState(state_, state);
646 state_ = state;
647 SignalState(this, state_);
648 signaling_thread_->Post(this, MSG_STATE);
649 }
650 SignalNewDescription();
651}
652
653void BaseSession::SetError(Error error, const std::string& error_desc) {
654 ASSERT(signaling_thread_->IsCurrent());
655 if (error != error_) {
656 error_ = error;
657 error_desc_ = error_desc;
658 SignalError(this, error);
659 }
660}
661
662void BaseSession::OnSignalingReady() {
663 ASSERT(signaling_thread()->IsCurrent());
664 for (TransportMap::iterator iter = transports_.begin();
665 iter != transports_.end(); ++iter) {
666 iter->second->OnSignalingReady();
667 }
668}
669
670// TODO(juberti): Since PushdownLocalTD now triggers the connection process to
671// start, remove this method once everyone calls PushdownLocalTD.
672void BaseSession::SpeculativelyConnectAllTransportChannels() {
673 // Put all transports into the connecting state.
674 for (TransportMap::iterator iter = transports_.begin();
675 iter != transports_.end(); ++iter) {
676 iter->second->ConnectChannels();
677 }
678}
679
680bool BaseSession::OnRemoteCandidates(const std::string& content_name,
681 const Candidates& candidates,
682 std::string* error) {
683 // Give candidates to the appropriate transport, and tell that transport
684 // to start connecting, if it's not already doing so.
685 TransportProxy* transproxy = GetTransportProxy(content_name);
686 if (!transproxy) {
687 *error = "Unknown content name " + content_name;
688 return false;
689 }
690 if (!transproxy->OnRemoteCandidates(candidates, error)) {
691 return false;
692 }
693 // TODO(juberti): Remove this call once we can be sure that we always have
694 // a local transport description (which will trigger the connection).
695 transproxy->ConnectChannels();
696 return true;
697}
698
699bool BaseSession::MaybeEnableMuxingSupport() {
700 // We need both a local and remote description to decide if we should mux.
701 if ((state_ == STATE_SENTINITIATE ||
702 state_ == STATE_RECEIVEDINITIATE) &&
703 ((local_description_ == NULL) ||
704 (remote_description_ == NULL))) {
705 return false;
706 }
707
708 // In order to perform the multiplexing, we need all proxies to be in the
709 // negotiated state, i.e. to have implementations underneath.
710 // Ensure that this is the case, regardless of whether we are going to mux.
711 for (TransportMap::iterator iter = transports_.begin();
712 iter != transports_.end(); ++iter) {
713 ASSERT(iter->second->negotiated());
714 if (!iter->second->negotiated())
715 return false;
716 }
717
718 // If both sides agree to BUNDLE, mux all the specified contents onto the
719 // transport belonging to the first content name in the BUNDLE group.
720 // If the contents are already muxed, this will be a no-op.
721 // TODO(juberti): Should this check that local and remote have configured
722 // BUNDLE the same way?
723 bool candidates_allocated = IsCandidateAllocationDone();
724 const ContentGroup* local_bundle_group =
725 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
726 const ContentGroup* remote_bundle_group =
727 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
728 if (local_bundle_group && remote_bundle_group &&
729 local_bundle_group->FirstContentName()) {
730 const std::string* content_name = local_bundle_group->FirstContentName();
731 const ContentInfo* content =
732 local_description_->GetContentByName(*content_name);
733 ASSERT(content != NULL);
734 if (!SetSelectedProxy(content->name, local_bundle_group)) {
735 LOG(LS_WARNING) << "Failed to set up BUNDLE";
736 return false;
737 }
738
739 // If we weren't done gathering before, we might be done now, as a result
740 // of enabling mux.
741 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
742 << *content_name;
743 if (!candidates_allocated) {
744 MaybeCandidateAllocationDone();
745 }
746 } else {
747 LOG(LS_INFO) << "No BUNDLE information, not bundling.";
748 }
749 return true;
750}
751
752bool BaseSession::SetSelectedProxy(const std::string& content_name,
753 const ContentGroup* muxed_group) {
754 TransportProxy* selected_proxy = GetTransportProxy(content_name);
755 if (!selected_proxy) {
756 return false;
757 }
758
759 ASSERT(selected_proxy->negotiated());
760 for (TransportMap::iterator iter = transports_.begin();
761 iter != transports_.end(); ++iter) {
762 // If content is part of the mux group, then repoint its proxy at the
763 // transport object that we have chosen to mux onto. If the proxy
764 // is already pointing at the right object, it will be a no-op.
765 if (muxed_group->HasContentName(iter->first) &&
766 !iter->second->SetupMux(selected_proxy)) {
767 return false;
768 }
769 }
770 return true;
771}
772
773void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
774 // TODO(juberti): This is a clunky way of processing the done signal. Instead,
775 // TransportProxy should receive the done signal directly, set its allocated
776 // flag internally, and then reissue the done signal to Session.
777 // Overall we should make TransportProxy receive *all* the signals from
778 // Transport, since this removes the need to manually iterate over all
779 // the transports, as is needed to make sure signals are handled properly
780 // when BUNDLEing.
781 // TODO(juberti): Per b/7998978, devs and QA are hitting this assert in ways
782 // that make it prohibitively difficult to run dbg builds. Disabled for now.
783 //ASSERT(!IsCandidateAllocationDone());
784 for (TransportMap::iterator iter = transports_.begin();
785 iter != transports_.end(); ++iter) {
786 if (iter->second->impl() == transport) {
787 iter->second->set_candidates_allocated(true);
788 }
789 }
790 MaybeCandidateAllocationDone();
791}
792
793bool BaseSession::IsCandidateAllocationDone() const {
794 for (TransportMap::const_iterator iter = transports_.begin();
795 iter != transports_.end(); ++iter) {
796 if (!iter->second->candidates_allocated())
797 return false;
798 }
799 return true;
800}
801
802void BaseSession::MaybeCandidateAllocationDone() {
803 if (IsCandidateAllocationDone()) {
804 LOG(LS_INFO) << "Candidate gathering is complete.";
805 OnCandidatesAllocationDone();
806 }
807}
808
809void BaseSession::OnRoleConflict() {
810 if (role_switch_) {
811 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
812 return;
813 }
814
815 role_switch_ = true;
816 for (TransportMap::iterator iter = transports_.begin();
817 iter != transports_.end(); ++iter) {
818 // Role will be reverse of initial role setting.
819 IceRole role = initiator_ ? ICEROLE_CONTROLLED : ICEROLE_CONTROLLING;
820 iter->second->SetIceRole(role);
821 }
822}
823
824void BaseSession::LogState(State old_state, State new_state) {
825 LOG(LS_INFO) << "Session:" << id()
826 << " Old state:" << StateToString(old_state)
827 << " New state:" << StateToString(new_state)
828 << " Type:" << content_type()
829 << " Transport:" << transport_type();
830}
831
832// static
833bool BaseSession::GetTransportDescription(const SessionDescription* description,
834 const std::string& content_name,
835 TransportDescription* tdesc) {
836 if (!description || !tdesc) {
837 return false;
838 }
839 const TransportInfo* transport_info =
840 description->GetTransportInfoByName(content_name);
841 if (!transport_info) {
842 return false;
843 }
844 *tdesc = transport_info->description;
845 return true;
846}
847
848void BaseSession::SignalNewDescription() {
849 ContentAction action;
850 ContentSource source;
851 if (!GetContentAction(&action, &source)) {
852 return;
853 }
854 if (source == CS_LOCAL) {
855 SignalNewLocalDescription(this, action);
856 } else {
857 SignalNewRemoteDescription(this, action);
858 }
859}
860
861bool BaseSession::GetContentAction(ContentAction* action,
862 ContentSource* source) {
863 switch (state_) {
864 // new local description
865 case STATE_SENTINITIATE:
866 *action = CA_OFFER;
867 *source = CS_LOCAL;
868 break;
869 case STATE_SENTPRACCEPT:
870 *action = CA_PRANSWER;
871 *source = CS_LOCAL;
872 break;
873 case STATE_SENTACCEPT:
874 *action = CA_ANSWER;
875 *source = CS_LOCAL;
876 break;
877 // new remote description
878 case STATE_RECEIVEDINITIATE:
879 *action = CA_OFFER;
880 *source = CS_REMOTE;
881 break;
882 case STATE_RECEIVEDPRACCEPT:
883 *action = CA_PRANSWER;
884 *source = CS_REMOTE;
885 break;
886 case STATE_RECEIVEDACCEPT:
887 *action = CA_ANSWER;
888 *source = CS_REMOTE;
889 break;
890 default:
891 return false;
892 }
893 return true;
894}
895
896void BaseSession::OnMessage(rtc::Message *pmsg) {
897 switch (pmsg->message_id) {
898 case MSG_TIMEOUT:
899 // Session timeout has occured.
900 SetError(ERROR_TIME, "Session timeout has occured.");
901 break;
902
903 case MSG_STATE:
904 switch (state_) {
905 case STATE_SENTACCEPT:
906 case STATE_RECEIVEDACCEPT:
907 SetState(STATE_INPROGRESS);
908 break;
909
910 default:
911 // Explicitly ignoring some states here.
912 break;
913 }
914 break;
915 }
916}
917
pthatcher@webrtc.orgf0507912014-12-16 22:28:03 +0000918Session::Session(SessionManager* session_manager,
919 const std::string& local_name,
920 const std::string& initiator_name,
921 const std::string& sid,
922 const std::string& content_type,
923 SessionClient* client)
924 : BaseSession(session_manager->signaling_thread(),
925 session_manager->worker_thread(),
926 session_manager->port_allocator(),
927 sid, content_type, initiator_name == local_name) {
928 ASSERT(client != NULL);
929 session_manager_ = session_manager;
930 local_name_ = local_name;
931 initiator_name_ = initiator_name;
932 transport_parser_ = new P2PTransportParser();
933 client_ = client;
934 initiate_acked_ = false;
935 current_protocol_ = PROTOCOL_HYBRID;
936}
937
938Session::~Session() {
939 delete transport_parser_;
940}
941
942bool Session::Initiate(const std::string& to,
943 const SessionDescription* sdesc) {
944 ASSERT(signaling_thread()->IsCurrent());
945 SessionError error;
946
947 // Only from STATE_INIT
948 if (state() != STATE_INIT)
949 return false;
950
951 // Setup for signaling.
952 set_remote_name(to);
953 set_local_description(sdesc);
954 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
955 &error)) {
956 LOG(LS_ERROR) << "Could not create transports: " << error.text;
957 return false;
958 }
959
960 if (!SendInitiateMessage(sdesc, &error)) {
961 LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
962 return false;
963 }
964
965 // We need to connect transport proxy and impl here so that we can process
966 // the TransportDescriptions.
967 SpeculativelyConnectAllTransportChannels();
968
969 PushdownTransportDescription(CS_LOCAL, CA_OFFER, NULL);
970 SetState(Session::STATE_SENTINITIATE);
971 return true;
972}
973
974bool Session::Accept(const SessionDescription* sdesc) {
975 ASSERT(signaling_thread()->IsCurrent());
976
977 // Only if just received initiate
978 if (state() != STATE_RECEIVEDINITIATE)
979 return false;
980
981 // Setup for signaling.
982 set_local_description(sdesc);
983
984 SessionError error;
985 if (!SendAcceptMessage(sdesc, &error)) {
986 LOG(LS_ERROR) << "Could not send accept message: " << error.text;
987 return false;
988 }
989 // TODO(juberti): Add BUNDLE support to transport-info messages.
990 PushdownTransportDescription(CS_LOCAL, CA_ANSWER, NULL);
991 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
992 SetState(Session::STATE_SENTACCEPT);
993 return true;
994}
995
996bool Session::Reject(const std::string& reason) {
997 ASSERT(signaling_thread()->IsCurrent());
998
999 // Reject is sent in response to an initiate or modify, to reject the
1000 // request
1001 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
1002 return false;
1003
1004 SessionError error;
1005 if (!SendRejectMessage(reason, &error)) {
1006 LOG(LS_ERROR) << "Could not send reject message: " << error.text;
1007 return false;
1008 }
1009
1010 SetState(STATE_SENTREJECT);
1011 return true;
1012}
1013
1014bool Session::TerminateWithReason(const std::string& reason) {
1015 ASSERT(signaling_thread()->IsCurrent());
1016
1017 // Either side can terminate, at any time.
1018 switch (state()) {
1019 case STATE_SENTTERMINATE:
1020 case STATE_RECEIVEDTERMINATE:
1021 return false;
1022
1023 case STATE_SENTREJECT:
1024 case STATE_RECEIVEDREJECT:
1025 // We don't need to send terminate if we sent or received a reject...
1026 // it's implicit.
1027 break;
1028
1029 default:
1030 SessionError error;
1031 if (!SendTerminateMessage(reason, &error)) {
1032 LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
1033 return false;
1034 }
1035 break;
1036 }
1037
1038 SetState(STATE_SENTTERMINATE);
1039 return true;
1040}
1041
1042bool Session::SendInfoMessage(const XmlElements& elems,
1043 const std::string& remote_name) {
1044 ASSERT(signaling_thread()->IsCurrent());
1045 SessionError error;
1046 if (!SendMessage(ACTION_SESSION_INFO, elems, remote_name, &error)) {
1047 LOG(LS_ERROR) << "Could not send info message " << error.text;
1048 return false;
1049 }
1050 return true;
1051}
1052
1053bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
1054 XmlElements elems;
1055 WriteError write_error;
1056 if (!WriteDescriptionInfo(current_protocol_,
1057 contents,
1058 GetContentParsers(),
1059 &elems, &write_error)) {
1060 LOG(LS_ERROR) << "Could not write description info message: "
1061 << write_error.text;
1062 return false;
1063 }
1064 SessionError error;
1065 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
1066 LOG(LS_ERROR) << "Could not send description info message: "
1067 << error.text;
1068 return false;
1069 }
1070 return true;
1071}
1072
1073TransportInfos Session::GetEmptyTransportInfos(
1074 const ContentInfos& contents) const {
1075 TransportInfos tinfos;
1076 for (ContentInfos::const_iterator content = contents.begin();
1077 content != contents.end(); ++content) {
1078 tinfos.push_back(TransportInfo(content->name,
1079 TransportDescription(transport_type(),
1080 std::string(),
1081 std::string())));
1082 }
1083 return tinfos;
1084}
1085
1086bool Session::OnRemoteCandidates(
1087 const TransportInfos& tinfos, ParseError* error) {
1088 for (TransportInfos::const_iterator tinfo = tinfos.begin();
1089 tinfo != tinfos.end(); ++tinfo) {
1090 std::string str_error;
1091 if (!BaseSession::OnRemoteCandidates(
1092 tinfo->content_name, tinfo->description.candidates, &str_error)) {
1093 return BadParse(str_error, error);
1094 }
1095 }
1096 return true;
1097}
1098
1099bool Session::CreateTransportProxies(const TransportInfos& tinfos,
1100 SessionError* error) {
1101 for (TransportInfos::const_iterator tinfo = tinfos.begin();
1102 tinfo != tinfos.end(); ++tinfo) {
1103 if (tinfo->description.transport_type != transport_type()) {
1104 error->SetText("No supported transport in offer.");
1105 return false;
1106 }
1107
1108 GetOrCreateTransportProxy(tinfo->content_name);
1109 }
1110 return true;
1111}
1112
1113TransportParserMap Session::GetTransportParsers() {
1114 TransportParserMap parsers;
1115 parsers[transport_type()] = transport_parser_;
1116 return parsers;
1117}
1118
1119CandidateTranslatorMap Session::GetCandidateTranslators() {
1120 CandidateTranslatorMap translators;
1121 // NOTE: This technique makes it impossible to parse G-ICE
1122 // candidates in session-initiate messages because the channels
1123 // aren't yet created at that point. Since we don't use candidates
1124 // in session-initiate messages, we should be OK. Once we switch to
1125 // ICE, this translation shouldn't be necessary.
1126 for (TransportMap::const_iterator iter = transport_proxies().begin();
1127 iter != transport_proxies().end(); ++iter) {
1128 translators[iter->first] = iter->second;
1129 }
1130 return translators;
1131}
1132
1133ContentParserMap Session::GetContentParsers() {
1134 ContentParserMap parsers;
1135 parsers[content_type()] = client_;
1136 // We need to be able parse both RTP-based and SCTP-based Jingle
1137 // with the same client.
1138 if (content_type() == NS_JINGLE_RTP) {
1139 parsers[NS_JINGLE_DRAFT_SCTP] = client_;
1140 }
1141 return parsers;
1142}
1143
1144void Session::OnTransportRequestSignaling(Transport* transport) {
1145 ASSERT(signaling_thread()->IsCurrent());
1146 TransportProxy* transproxy = GetTransportProxy(transport);
1147 ASSERT(transproxy != NULL);
1148 if (transproxy) {
1149 // Reset candidate allocation status for the transport proxy.
1150 transproxy->set_candidates_allocated(false);
1151 }
1152 SignalRequestSignaling(this);
1153}
1154
1155void Session::OnTransportConnecting(Transport* transport) {
1156 // This is an indication that we should begin watching the writability
1157 // state of the transport.
1158 OnTransportWritable(transport);
1159}
1160
1161void Session::OnTransportWritable(Transport* transport) {
1162 ASSERT(signaling_thread()->IsCurrent());
1163
1164 // If the transport is not writable, start a timer to make sure that it
1165 // becomes writable within a reasonable amount of time. If it does not, we
1166 // terminate since we can't actually send data. If the transport is writable,
1167 // cancel the timer. Note that writability transitions may occur repeatedly
1168 // during the lifetime of the session.
1169 signaling_thread()->Clear(this, MSG_TIMEOUT);
1170 if (transport->HasChannels() && !transport->writable()) {
1171 signaling_thread()->PostDelayed(
1172 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
1173 }
1174}
1175
1176void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
1177 const Candidates& candidates) {
1178 ASSERT(signaling_thread()->IsCurrent());
1179 if (transproxy != NULL) {
1180 if (initiator() && !initiate_acked_) {
1181 // TODO: This is to work around server re-ordering
1182 // messages. We send the candidates once the session-initiate
1183 // is acked. Once we have fixed the server to guarantee message
1184 // order, we can remove this case.
1185 transproxy->AddUnsentCandidates(candidates);
1186 } else {
1187 if (!transproxy->negotiated()) {
1188 transproxy->AddSentCandidates(candidates);
1189 }
1190 SessionError error;
1191 if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
1192 LOG(LS_ERROR) << "Could not send transport info message: "
1193 << error.text;
1194 return;
1195 }
1196 }
1197 }
1198}
1199
1200void Session::OnTransportSendError(Transport* transport,
1201 const buzz::XmlElement* stanza,
1202 const buzz::QName& name,
1203 const std::string& type,
1204 const std::string& text,
1205 const buzz::XmlElement* extra_info) {
1206 ASSERT(signaling_thread()->IsCurrent());
1207 SignalErrorMessage(this, stanza, name, type, text, extra_info);
1208}
1209
1210void Session::OnIncomingMessage(const SessionMessage& msg) {
1211 ASSERT(signaling_thread()->IsCurrent());
1212 ASSERT(state() == STATE_INIT || msg.from == remote_name());
1213
1214 if (current_protocol_== PROTOCOL_HYBRID) {
1215 if (msg.protocol == PROTOCOL_GINGLE) {
1216 current_protocol_ = PROTOCOL_GINGLE;
1217 } else {
1218 current_protocol_ = PROTOCOL_JINGLE;
1219 }
1220 }
1221
1222 bool valid = false;
1223 MessageError error;
1224 switch (msg.type) {
1225 case ACTION_SESSION_INITIATE:
1226 valid = OnInitiateMessage(msg, &error);
1227 break;
1228 case ACTION_SESSION_INFO:
1229 valid = OnInfoMessage(msg);
1230 break;
1231 case ACTION_SESSION_ACCEPT:
1232 valid = OnAcceptMessage(msg, &error);
1233 break;
1234 case ACTION_SESSION_REJECT:
1235 valid = OnRejectMessage(msg, &error);
1236 break;
1237 case ACTION_SESSION_TERMINATE:
1238 valid = OnTerminateMessage(msg, &error);
1239 break;
1240 case ACTION_TRANSPORT_INFO:
1241 valid = OnTransportInfoMessage(msg, &error);
1242 break;
1243 case ACTION_TRANSPORT_ACCEPT:
1244 valid = OnTransportAcceptMessage(msg, &error);
1245 break;
1246 case ACTION_DESCRIPTION_INFO:
1247 valid = OnDescriptionInfoMessage(msg, &error);
1248 break;
1249 default:
1250 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
1251 "unknown session message type",
1252 &error);
1253 }
1254
1255 if (valid) {
1256 SendAcknowledgementMessage(msg.stanza);
1257 } else {
1258 SignalErrorMessage(this, msg.stanza, error.type,
1259 "modify", error.text, NULL);
1260 }
1261}
1262
1263void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
1264 const buzz::XmlElement* response_stanza,
1265 const SessionMessage& msg) {
1266 ASSERT(signaling_thread()->IsCurrent());
1267
1268 if (msg.type == ACTION_SESSION_INITIATE) {
1269 OnInitiateAcked();
1270 }
1271}
1272
1273void Session::OnInitiateAcked() {
1274 // TODO: This is to work around server re-ordering
1275 // messages. We send the candidates once the session-initiate
1276 // is acked. Once we have fixed the server to guarantee message
1277 // order, we can remove this case.
1278 if (!initiate_acked_) {
1279 initiate_acked_ = true;
1280 SessionError error;
1281 SendAllUnsentTransportInfoMessages(&error);
1282 }
1283}
1284
1285void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
1286 const buzz::XmlElement* error_stanza) {
1287 ASSERT(signaling_thread()->IsCurrent());
1288
1289 SessionMessage msg;
1290 ParseError parse_error;
1291 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
1292 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
1293 << ":" << orig_stanza;
1294 return;
1295 }
1296
1297 // If the error is a session redirect, call OnRedirectError, which will
1298 // continue the session with a new remote JID.
1299 SessionRedirect redirect;
1300 if (FindSessionRedirect(error_stanza, &redirect)) {
1301 SessionError error;
1302 if (!OnRedirectError(redirect, &error)) {
1303 // TODO: Should we send a message back? The standard
1304 // says nothing about it.
1305 std::ostringstream desc;
1306 desc << "Failed to redirect: " << error.text;
1307 LOG(LS_ERROR) << desc.str();
1308 SetError(ERROR_RESPONSE, desc.str());
1309 }
1310 return;
1311 }
1312
1313 std::string error_type = "cancel";
1314
1315 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
1316 if (error) {
1317 error_type = error->Attr(buzz::QN_TYPE);
1318
1319 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
1320 << "in response to:\n" << orig_stanza->Str();
1321 } else {
1322 // don't crash if <error> is missing
1323 LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
1324 return;
1325 }
1326
1327 if (msg.type == ACTION_TRANSPORT_INFO) {
1328 // Transport messages frequently generate errors because they are sent right
1329 // when we detect a network failure. For that reason, we ignore such
1330 // errors, because if we do not establish writability again, we will
1331 // terminate anyway. The exceptions are transport-specific error tags,
1332 // which we pass on to the respective transport.
1333 } else if ((error_type != "continue") && (error_type != "wait")) {
1334 // We do not set an error if the other side said it is okay to continue
1335 // (possibly after waiting). These errors can be ignored.
1336 SetError(ERROR_RESPONSE, "");
1337 }
1338}
1339
1340bool Session::OnInitiateMessage(const SessionMessage& msg,
1341 MessageError* error) {
1342 if (!CheckState(STATE_INIT, error))
1343 return false;
1344
1345 SessionInitiate init;
1346 if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
1347 GetContentParsers(), GetTransportParsers(),
1348 GetCandidateTranslators(),
1349 &init, error))
1350 return false;
1351
1352 SessionError session_error;
1353 if (!CreateTransportProxies(init.transports, &session_error)) {
1354 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
1355 session_error.text, error);
1356 }
1357
1358 set_remote_name(msg.from);
1359 set_initiator_name(msg.initiator);
1360 set_remote_description(new SessionDescription(init.ClearContents(),
1361 init.transports,
1362 init.groups));
1363 // Updating transport with TransportDescription.
1364 PushdownTransportDescription(CS_REMOTE, CA_OFFER, NULL);
1365 SetState(STATE_RECEIVEDINITIATE);
1366
1367 // Users of Session may listen to state change and call Reject().
1368 if (state() != STATE_SENTREJECT) {
1369 if (!OnRemoteCandidates(init.transports, error))
1370 return false;
1371
1372 // TODO(juberti): Auto-generate and push down the local transport answer.
1373 // This is necessary for trickling to work with RFC 5245 ICE.
1374 }
1375 return true;
1376}
1377
1378bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
1379 if (!CheckState(STATE_SENTINITIATE, error))
1380 return false;
1381
1382 SessionAccept accept;
1383 if (!ParseSessionAccept(msg.protocol, msg.action_elem,
1384 GetContentParsers(), GetTransportParsers(),
1385 GetCandidateTranslators(),
1386 &accept, error)) {
1387 return false;
1388 }
1389
1390 // If we get an accept, we can assume the initiate has been
1391 // received, even if we haven't gotten an IQ response.
1392 OnInitiateAcked();
1393
1394 set_remote_description(new SessionDescription(accept.ClearContents(),
1395 accept.transports,
1396 accept.groups));
1397 // Updating transport with TransportDescription.
1398 PushdownTransportDescription(CS_REMOTE, CA_ANSWER, NULL);
1399 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
1400 SetState(STATE_RECEIVEDACCEPT);
1401
1402 if (!OnRemoteCandidates(accept.transports, error))
1403 return false;
1404
1405 return true;
1406}
1407
1408bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
1409 if (!CheckState(STATE_SENTINITIATE, error))
1410 return false;
1411
1412 SetState(STATE_RECEIVEDREJECT);
1413 return true;
1414}
1415
1416bool Session::OnInfoMessage(const SessionMessage& msg) {
1417 SignalInfoMessage(this, msg.action_elem);
1418 return true;
1419}
1420
1421bool Session::OnTerminateMessage(const SessionMessage& msg,
1422 MessageError* error) {
1423 SessionTerminate term;
1424 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
1425 return false;
1426
1427 SignalReceivedTerminateReason(this, term.reason);
1428 if (term.debug_reason != buzz::STR_EMPTY) {
1429 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
1430 }
1431
1432 SetState(STATE_RECEIVEDTERMINATE);
1433 return true;
1434}
1435
1436bool Session::OnTransportInfoMessage(const SessionMessage& msg,
1437 MessageError* error) {
1438 TransportInfos tinfos;
1439 if (!ParseTransportInfos(msg.protocol, msg.action_elem,
1440 initiator_description()->contents(),
1441 GetTransportParsers(), GetCandidateTranslators(),
1442 &tinfos, error))
1443 return false;
1444
1445 if (!OnRemoteCandidates(tinfos, error))
1446 return false;
1447
1448 return true;
1449}
1450
1451bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
1452 MessageError* error) {
1453 // TODO: Currently here only for compatibility with
1454 // Gingle 1.1 clients (notably, Google Voice).
1455 return true;
1456}
1457
1458bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
1459 MessageError* error) {
1460 if (!CheckState(STATE_INPROGRESS, error))
1461 return false;
1462
1463 DescriptionInfo description_info;
1464 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
1465 GetContentParsers(), GetTransportParsers(),
1466 GetCandidateTranslators(),
1467 &description_info, error)) {
1468 return false;
1469 }
1470
1471 ContentInfos& updated_contents = description_info.contents;
1472
1473 // TODO: Currently, reflector sends back
1474 // video stream updates even for an audio-only call, which causes
1475 // this to fail. Put this back once reflector is fixed.
1476 //
1477 // ContentInfos::iterator it;
1478 // First, ensure all updates are valid before modifying remote_description_.
1479 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1480 // if (remote_description()->GetContentByName(it->name) == NULL) {
1481 // return false;
1482 // }
1483 // }
1484
1485 // TODO: We used to replace contents from an update, but
1486 // that no longer works with partial updates. We need to figure out
1487 // a way to merge patial updates into contents. For now, users of
1488 // Session should listen to SignalRemoteDescriptionUpdate and handle
1489 // updates. They should not expect remote_description to be the
1490 // latest value.
1491 //
1492 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1493 // remote_description()->RemoveContentByName(it->name);
1494 // remote_description()->AddContent(it->name, it->type, it->description);
1495 // }
1496 // }
1497
1498 SignalRemoteDescriptionUpdate(this, updated_contents);
1499 return true;
1500}
1501
1502bool BareJidsEqual(const std::string& name1,
1503 const std::string& name2) {
1504 buzz::Jid jid1(name1);
1505 buzz::Jid jid2(name2);
1506
1507 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
1508}
1509
1510bool Session::OnRedirectError(const SessionRedirect& redirect,
1511 SessionError* error) {
1512 MessageError message_error;
1513 if (!CheckState(STATE_SENTINITIATE, &message_error)) {
1514 return BadWrite(message_error.text, error);
1515 }
1516
1517 if (!BareJidsEqual(remote_name(), redirect.target))
1518 return BadWrite("Redirection not allowed: must be the same bare jid.",
1519 error);
1520
1521 // When we receive a redirect, we point the session at the new JID
1522 // and resend the candidates.
1523 set_remote_name(redirect.target);
1524 return (SendInitiateMessage(local_description(), error) &&
1525 ResendAllTransportInfoMessages(error));
1526}
1527
1528bool Session::CheckState(State expected, MessageError* error) {
1529 if (state() != expected) {
1530 // The server can deliver messages out of order/repeated for various
1531 // reasons. For example, if the server does not recive our iq response,
1532 // it could assume that the iq it sent was lost, and will then send
1533 // it again. Ideally, we should implement reliable messaging with
1534 // duplicate elimination.
1535 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
1536 "message not allowed in current state",
1537 error);
1538 }
1539 return true;
1540}
1541
1542void Session::SetError(Error error, const std::string& error_desc) {
1543 BaseSession::SetError(error, error_desc);
1544 if (error != ERROR_NONE)
1545 signaling_thread()->Post(this, MSG_ERROR);
1546}
1547
1548void Session::OnMessage(rtc::Message* pmsg) {
1549 // preserve this because BaseSession::OnMessage may modify it
1550 State orig_state = state();
1551
1552 BaseSession::OnMessage(pmsg);
1553
1554 switch (pmsg->message_id) {
1555 case MSG_ERROR:
1556 TerminateWithReason(STR_TERMINATE_ERROR);
1557 break;
1558
1559 case MSG_STATE:
1560 switch (orig_state) {
1561 case STATE_SENTREJECT:
1562 case STATE_RECEIVEDREJECT:
1563 // Assume clean termination.
1564 Terminate();
1565 break;
1566
1567 case STATE_SENTTERMINATE:
1568 case STATE_RECEIVEDTERMINATE:
1569 session_manager_->DestroySession(this);
1570 break;
1571
1572 default:
1573 // Explicitly ignoring some states here.
1574 break;
1575 }
1576 break;
1577 }
1578}
1579
1580bool Session::SendInitiateMessage(const SessionDescription* sdesc,
1581 SessionError* error) {
1582 SessionInitiate init;
1583 init.contents = sdesc->contents();
1584 init.transports = GetEmptyTransportInfos(init.contents);
1585 init.groups = sdesc->groups();
1586 return SendMessage(ACTION_SESSION_INITIATE, init, error);
1587}
1588
1589bool Session::WriteSessionAction(
1590 SignalingProtocol protocol, const SessionInitiate& init,
1591 XmlElements* elems, WriteError* error) {
1592 return WriteSessionInitiate(protocol, init.contents, init.transports,
1593 GetContentParsers(), GetTransportParsers(),
1594 GetCandidateTranslators(), init.groups,
1595 elems, error);
1596}
1597
1598bool Session::SendAcceptMessage(const SessionDescription* sdesc,
1599 SessionError* error) {
1600 XmlElements elems;
1601 if (!WriteSessionAccept(current_protocol_,
1602 sdesc->contents(),
1603 GetEmptyTransportInfos(sdesc->contents()),
1604 GetContentParsers(), GetTransportParsers(),
1605 GetCandidateTranslators(), sdesc->groups(),
1606 &elems, error)) {
1607 return false;
1608 }
1609 return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
1610}
1611
1612bool Session::SendRejectMessage(const std::string& reason,
1613 SessionError* error) {
1614 SessionTerminate term(reason);
1615 return SendMessage(ACTION_SESSION_REJECT, term, error);
1616}
1617
1618bool Session::SendTerminateMessage(const std::string& reason,
1619 SessionError* error) {
1620 SessionTerminate term(reason);
1621 return SendMessage(ACTION_SESSION_TERMINATE, term, error);
1622}
1623
1624bool Session::WriteSessionAction(SignalingProtocol protocol,
1625 const SessionTerminate& term,
1626 XmlElements* elems, WriteError* error) {
1627 WriteSessionTerminate(protocol, term, elems);
1628 return true;
1629}
1630
1631bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
1632 SessionError* error) {
1633 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
1634}
1635
1636bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
1637 const Candidates& candidates,
1638 SessionError* error) {
1639 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
1640 TransportDescription(transproxy->type(), std::vector<std::string>(),
1641 std::string(), std::string(), ICEMODE_FULL,
1642 CONNECTIONROLE_NONE, NULL, candidates)), error);
1643}
1644
1645bool Session::WriteSessionAction(SignalingProtocol protocol,
1646 const TransportInfo& tinfo,
1647 XmlElements* elems, WriteError* error) {
1648 TransportInfos tinfos;
1649 tinfos.push_back(tinfo);
1650 return WriteTransportInfos(protocol, tinfos,
1651 GetTransportParsers(), GetCandidateTranslators(),
1652 elems, error);
1653}
1654
1655bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1656 for (TransportMap::const_iterator iter = transport_proxies().begin();
1657 iter != transport_proxies().end(); ++iter) {
1658 TransportProxy* transproxy = iter->second;
1659 if (transproxy->sent_candidates().size() > 0) {
1660 if (!SendTransportInfoMessage(
1661 transproxy, transproxy->sent_candidates(), error)) {
1662 LOG(LS_ERROR) << "Could not resend transport info messages: "
1663 << error->text;
1664 return false;
1665 }
1666 transproxy->ClearSentCandidates();
1667 }
1668 }
1669 return true;
1670}
1671
1672bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
1673 for (TransportMap::const_iterator iter = transport_proxies().begin();
1674 iter != transport_proxies().end(); ++iter) {
1675 TransportProxy* transproxy = iter->second;
1676 if (transproxy->unsent_candidates().size() > 0) {
1677 if (!SendTransportInfoMessage(
1678 transproxy, transproxy->unsent_candidates(), error)) {
1679 LOG(LS_ERROR) << "Could not send unsent transport info messages: "
1680 << error->text;
1681 return false;
1682 }
1683 transproxy->ClearUnsentCandidates();
1684 }
1685 }
1686 return true;
1687}
1688
1689bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1690 SessionError* error) {
1691 return SendMessage(type, action_elems, remote_name(), error);
1692}
1693
1694bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1695 const std::string& remote_name, SessionError* error) {
1696 rtc::scoped_ptr<buzz::XmlElement> stanza(
1697 new buzz::XmlElement(buzz::QN_IQ));
1698
1699 SessionMessage msg(current_protocol_, type, id(), initiator_name());
1700 msg.to = remote_name;
1701 WriteSessionMessage(msg, action_elems, stanza.get());
1702
1703 SignalOutgoingMessage(this, stanza.get());
1704 return true;
1705}
1706
1707template <typename Action>
1708bool Session::SendMessage(ActionType type, const Action& action,
1709 SessionError* error) {
1710 rtc::scoped_ptr<buzz::XmlElement> stanza(
1711 new buzz::XmlElement(buzz::QN_IQ));
1712 if (!WriteActionMessage(type, action, stanza.get(), error))
1713 return false;
1714
1715 SignalOutgoingMessage(this, stanza.get());
1716 return true;
1717}
1718
1719template <typename Action>
1720bool Session::WriteActionMessage(ActionType type, const Action& action,
1721 buzz::XmlElement* stanza,
1722 WriteError* error) {
1723 if (current_protocol_ == PROTOCOL_HYBRID) {
1724 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1725 return false;
1726 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1727 return false;
1728 } else {
1729 if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1730 return false;
1731 }
1732 return true;
1733}
1734
1735template <typename Action>
1736bool Session::WriteActionMessage(SignalingProtocol protocol,
1737 ActionType type, const Action& action,
1738 buzz::XmlElement* stanza, WriteError* error) {
1739 XmlElements action_elems;
1740 if (!WriteSessionAction(protocol, action, &action_elems, error))
1741 return false;
1742
1743 SessionMessage msg(protocol, type, id(), initiator_name());
1744 msg.to = remote_name();
1745
1746 WriteSessionMessage(msg, action_elems, stanza);
1747 return true;
1748}
1749
1750void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1751 rtc::scoped_ptr<buzz::XmlElement> ack(
1752 new buzz::XmlElement(buzz::QN_IQ));
1753 ack->SetAttr(buzz::QN_TO, remote_name());
1754 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1755 ack->SetAttr(buzz::QN_TYPE, "result");
1756
1757 SignalOutgoingMessage(this, ack.get());
1758}
1759
henrike@webrtc.org269fb4b2014-10-28 22:20:11 +00001760} // namespace cricket