blob: 86428901cf1039dfaa861385c783676ff81015bf [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include "talk/p2p/base/session.h"
29#include "talk/base/common.h"
30#include "talk/base/logging.h"
31#include "talk/base/helpers.h"
32#include "talk/base/scoped_ptr.h"
33#include "talk/base/sslstreamadapter.h"
34#include "talk/xmpp/constants.h"
35#include "talk/xmpp/jid.h"
36#include "talk/p2p/base/dtlstransport.h"
37#include "talk/p2p/base/p2ptransport.h"
38#include "talk/p2p/base/sessionclient.h"
39#include "talk/p2p/base/transport.h"
40#include "talk/p2p/base/transportchannelproxy.h"
41#include "talk/p2p/base/transportinfo.h"
42
43#include "talk/p2p/base/constants.h"
44
45namespace cricket {
46
47bool BadMessage(const buzz::QName type,
48 const std::string& text,
49 MessageError* err) {
50 err->SetType(type);
51 err->SetText(text);
52 return false;
53}
54
55TransportProxy::~TransportProxy() {
56 for (ChannelMap::iterator iter = channels_.begin();
57 iter != channels_.end(); ++iter) {
58 iter->second->SignalDestroyed(iter->second);
59 delete iter->second;
60 }
61}
62
63std::string TransportProxy::type() const {
64 return transport_->get()->type();
65}
66
67TransportChannel* TransportProxy::GetChannel(int component) {
68 return GetChannelProxy(component);
69}
70
71TransportChannel* TransportProxy::CreateChannel(
72 const std::string& name, int component) {
73 ASSERT(GetChannel(component) == NULL);
74 ASSERT(!transport_->get()->HasChannel(component));
75
76 // We always create a proxy in case we need to change out the transport later.
77 TransportChannelProxy* channel =
78 new TransportChannelProxy(content_name(), name, component);
79 channels_[component] = channel;
80
81 // If we're already negotiated, create an impl and hook it up to the proxy
82 // channel. If we're connecting, create an impl but don't hook it up yet.
83 if (negotiated_) {
84 SetChannelProxyImpl(component, channel);
85 } else if (connecting_) {
86 GetOrCreateChannelProxyImpl(component);
87 }
88 return channel;
89}
90
91bool TransportProxy::HasChannel(int component) {
92 return transport_->get()->HasChannel(component);
93}
94
95void TransportProxy::DestroyChannel(int component) {
96 TransportChannel* channel = GetChannel(component);
97 if (channel) {
98 // If the state of TransportProxy is not NEGOTIATED
99 // then TransportChannelProxy and its impl are not
100 // connected. Both must be connected before
101 // deletion.
102 if (!negotiated_) {
103 SetChannelProxyImpl(component, GetChannelProxy(component));
104 }
105
106 channels_.erase(component);
107 channel->SignalDestroyed(channel);
108 delete channel;
109 }
110}
111
112void TransportProxy::ConnectChannels() {
113 if (!connecting_) {
114 if (!negotiated_) {
115 for (ChannelMap::iterator iter = channels_.begin();
116 iter != channels_.end(); ++iter) {
117 GetOrCreateChannelProxyImpl(iter->first);
118 }
119 }
120 connecting_ = true;
121 }
122 // TODO(juberti): Right now Transport::ConnectChannels doesn't work if we
123 // don't have any channels yet, so we need to allow this method to be called
124 // multiple times. Once we fix Transport, we can move this call inside the
125 // if (!connecting_) block.
126 transport_->get()->ConnectChannels();
127}
128
129void TransportProxy::CompleteNegotiation() {
130 if (!negotiated_) {
131 for (ChannelMap::iterator iter = channels_.begin();
132 iter != channels_.end(); ++iter) {
133 SetChannelProxyImpl(iter->first, iter->second);
134 }
135 negotiated_ = true;
136 }
137}
138
139void TransportProxy::AddSentCandidates(const Candidates& candidates) {
140 for (Candidates::const_iterator cand = candidates.begin();
141 cand != candidates.end(); ++cand) {
142 sent_candidates_.push_back(*cand);
143 }
144}
145
146void TransportProxy::AddUnsentCandidates(const Candidates& candidates) {
147 for (Candidates::const_iterator cand = candidates.begin();
148 cand != candidates.end(); ++cand) {
149 unsent_candidates_.push_back(*cand);
150 }
151}
152
153bool TransportProxy::GetChannelNameFromComponent(
154 int component, std::string* channel_name) const {
155 const TransportChannelProxy* channel = GetChannelProxy(component);
156 if (channel == NULL) {
157 return false;
158 }
159
160 *channel_name = channel->name();
161 return true;
162}
163
164bool TransportProxy::GetComponentFromChannelName(
165 const std::string& channel_name, int* component) const {
166 const TransportChannelProxy* channel = GetChannelProxyByName(channel_name);
167 if (channel == NULL) {
168 return false;
169 }
170
171 *component = channel->component();
172 return true;
173}
174
175TransportChannelProxy* TransportProxy::GetChannelProxy(int component) const {
176 ChannelMap::const_iterator iter = channels_.find(component);
177 return (iter != channels_.end()) ? iter->second : NULL;
178}
179
180TransportChannelProxy* TransportProxy::GetChannelProxyByName(
181 const std::string& name) const {
182 for (ChannelMap::const_iterator iter = channels_.begin();
183 iter != channels_.end();
184 ++iter) {
185 if (iter->second->name() == name) {
186 return iter->second;
187 }
188 }
189 return NULL;
190}
191
192TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl(
193 int component) {
194 TransportChannelImpl* impl = transport_->get()->GetChannel(component);
195 if (impl == NULL) {
196 impl = transport_->get()->CreateChannel(component);
197 impl->SetSessionId(sid_);
198 }
199 return impl;
200}
201
202void TransportProxy::SetChannelProxyImpl(
203 int component, TransportChannelProxy* transproxy) {
204 TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
205 ASSERT(impl != NULL);
206 transproxy->SetImplementation(impl);
207}
208
209// This function muxes |this| onto |target| by repointing |this| at
210// |target|'s transport and setting our TransportChannelProxies
211// to point to |target|'s underlying implementations.
212bool TransportProxy::SetupMux(TransportProxy* target) {
213 // Bail out if there's nothing to do.
214 if (transport_ == target->transport_) {
215 return true;
216 }
217
218 // Run through all channels and remove any non-rtp transport channels before
219 // setting target transport channels.
220 for (ChannelMap::const_iterator iter = channels_.begin();
221 iter != channels_.end(); ++iter) {
222 if (!target->transport_->get()->HasChannel(iter->first)) {
223 // Remove if channel doesn't exist in |transport_|.
224 iter->second->SetImplementation(NULL);
225 } else {
226 // Replace the impl for all the TransportProxyChannels with the channels
227 // from |target|'s transport. Fail if there's not an exact match.
228 iter->second->SetImplementation(
229 target->transport_->get()->CreateChannel(iter->first));
230 }
231 }
232
233 // Now replace our transport. Must happen afterwards because
234 // it deletes all impls as a side effect.
235 transport_ = target->transport_;
236 transport_->get()->SignalCandidatesReady.connect(
237 this, &TransportProxy::OnTransportCandidatesReady);
238 set_candidates_allocated(target->candidates_allocated());
239 return true;
240}
241
242void TransportProxy::SetRole(TransportRole role) {
243 transport_->get()->SetRole(role);
244}
245
246bool TransportProxy::SetLocalTransportDescription(
247 const TransportDescription& description, ContentAction action) {
248 // If this is an answer, finalize the negotiation.
249 if (action == CA_ANSWER) {
250 CompleteNegotiation();
251 }
252 return transport_->get()->SetLocalTransportDescription(description, action);
253}
254
255bool TransportProxy::SetRemoteTransportDescription(
256 const TransportDescription& description, ContentAction action) {
257 // If this is an answer, finalize the negotiation.
258 if (action == CA_ANSWER) {
259 CompleteNegotiation();
260 }
261 return transport_->get()->SetRemoteTransportDescription(description, action);
262}
263
264void TransportProxy::OnSignalingReady() {
265 // If we're starting a new allocation sequence, reset our state.
266 set_candidates_allocated(false);
267 transport_->get()->OnSignalingReady();
268}
269
270bool TransportProxy::OnRemoteCandidates(const Candidates& candidates,
271 std::string* error) {
272 // Ensure the transport is negotiated before handling candidates.
273 // TODO(juberti): Remove this once everybody calls SetLocalTD.
274 CompleteNegotiation();
275
276 // Verify each candidate before passing down to transport layer.
277 for (Candidates::const_iterator cand = candidates.begin();
278 cand != candidates.end(); ++cand) {
279 if (!transport_->get()->VerifyCandidate(*cand, error))
280 return false;
281 if (!HasChannel(cand->component())) {
282 *error = "Candidate has unknown component: " + cand->ToString() +
283 " for content: " + content_name_;
284 return false;
285 }
286 }
287 transport_->get()->OnRemoteCandidates(candidates);
288 return true;
289}
290
291std::string BaseSession::StateToString(State state) {
292 switch (state) {
293 case Session::STATE_INIT:
294 return "STATE_INIT";
295 case Session::STATE_SENTINITIATE:
296 return "STATE_SENTINITIATE";
297 case Session::STATE_RECEIVEDINITIATE:
298 return "STATE_RECEIVEDINITIATE";
299 case Session::STATE_SENTPRACCEPT:
300 return "STATE_SENTPRACCEPT";
301 case Session::STATE_SENTACCEPT:
302 return "STATE_SENTACCEPT";
303 case Session::STATE_RECEIVEDPRACCEPT:
304 return "STATE_RECEIVEDPRACCEPT";
305 case Session::STATE_RECEIVEDACCEPT:
306 return "STATE_RECEIVEDACCEPT";
307 case Session::STATE_SENTMODIFY:
308 return "STATE_SENTMODIFY";
309 case Session::STATE_RECEIVEDMODIFY:
310 return "STATE_RECEIVEDMODIFY";
311 case Session::STATE_SENTREJECT:
312 return "STATE_SENTREJECT";
313 case Session::STATE_RECEIVEDREJECT:
314 return "STATE_RECEIVEDREJECT";
315 case Session::STATE_SENTREDIRECT:
316 return "STATE_SENTREDIRECT";
317 case Session::STATE_SENTTERMINATE:
318 return "STATE_SENTTERMINATE";
319 case Session::STATE_RECEIVEDTERMINATE:
320 return "STATE_RECEIVEDTERMINATE";
321 case Session::STATE_INPROGRESS:
322 return "STATE_INPROGRESS";
323 case Session::STATE_DEINIT:
324 return "STATE_DEINIT";
325 default:
326 break;
327 }
328 return "STATE_" + talk_base::ToString(state);
329}
330
331BaseSession::BaseSession(talk_base::Thread* signaling_thread,
332 talk_base::Thread* worker_thread,
333 PortAllocator* port_allocator,
334 const std::string& sid,
335 const std::string& content_type,
336 bool initiator)
337 : state_(STATE_INIT),
338 error_(ERROR_NONE),
339 signaling_thread_(signaling_thread),
340 worker_thread_(worker_thread),
341 port_allocator_(port_allocator),
342 sid_(sid),
343 content_type_(content_type),
344 transport_type_(NS_GINGLE_P2P),
345 initiator_(initiator),
346 identity_(NULL),
347 local_description_(NULL),
348 remote_description_(NULL),
349 ice_tiebreaker_(talk_base::CreateRandomId64()),
350 role_switch_(false) {
351 ASSERT(signaling_thread->IsCurrent());
352}
353
354BaseSession::~BaseSession() {
355 ASSERT(signaling_thread()->IsCurrent());
356
357 ASSERT(state_ != STATE_DEINIT);
358 LogState(state_, STATE_DEINIT);
359 state_ = STATE_DEINIT;
360 SignalState(this, state_);
361
362 for (TransportMap::iterator iter = transports_.begin();
363 iter != transports_.end(); ++iter) {
364 delete iter->second;
365 }
366
367 delete remote_description_;
368 delete local_description_;
369}
370
371bool BaseSession::PushdownTransportDescription(ContentSource source,
372 ContentAction action) {
373 if (source == CS_LOCAL) {
374 return PushdownLocalTransportDescription(local_description_, action);
375 }
376 return PushdownRemoteTransportDescription(remote_description_, action);
377}
378
379bool BaseSession::PushdownLocalTransportDescription(
380 const SessionDescription* sdesc,
381 ContentAction action) {
382 // Update the Transports with the right information, and trigger them to
383 // start connecting.
384 for (TransportMap::iterator iter = transports_.begin();
385 iter != transports_.end(); ++iter) {
386 // If no transport info was in this session description, ret == false
387 // and we just skip this one.
388 TransportDescription tdesc;
389 bool ret = GetTransportDescription(
390 sdesc, iter->second->content_name(), &tdesc);
391 if (ret) {
392 if (!iter->second->SetLocalTransportDescription(tdesc, action)) {
393 return false;
394 }
395
396 iter->second->ConnectChannels();
397 }
398 }
399
400 return true;
401}
402
403bool BaseSession::PushdownRemoteTransportDescription(
404 const SessionDescription* sdesc,
405 ContentAction action) {
406 // Update the Transports with the right information.
407 for (TransportMap::iterator iter = transports_.begin();
408 iter != transports_.end(); ++iter) {
409 TransportDescription tdesc;
410
411 // If no transport info was in this session description, ret == false
412 // and we just skip this one.
413 bool ret = GetTransportDescription(
414 sdesc, iter->second->content_name(), &tdesc);
415 if (ret) {
416 if (!iter->second->SetRemoteTransportDescription(tdesc, action)) {
417 return false;
418 }
419 }
420 }
421
422 return true;
423}
424
425TransportChannel* BaseSession::CreateChannel(const std::string& content_name,
426 const std::string& channel_name,
427 int component) {
428 // We create the proxy "on demand" here because we need to support
429 // creating channels at any time, even before we send or receive
430 // initiate messages, which is before we create the transports.
431 TransportProxy* transproxy = GetOrCreateTransportProxy(content_name);
432 return transproxy->CreateChannel(channel_name, component);
433}
434
435TransportChannel* BaseSession::GetChannel(const std::string& content_name,
436 int component) {
437 TransportProxy* transproxy = GetTransportProxy(content_name);
438 if (transproxy == NULL)
439 return NULL;
440 else
441 return transproxy->GetChannel(component);
442}
443
444void BaseSession::DestroyChannel(const std::string& content_name,
445 int component) {
446 TransportProxy* transproxy = GetTransportProxy(content_name);
447 ASSERT(transproxy != NULL);
448 transproxy->DestroyChannel(component);
449}
450
451TransportProxy* BaseSession::GetOrCreateTransportProxy(
452 const std::string& content_name) {
453 TransportProxy* transproxy = GetTransportProxy(content_name);
454 if (transproxy)
455 return transproxy;
456
457 Transport* transport = CreateTransport(content_name);
458 transport->SetRole(initiator_ ? ROLE_CONTROLLING : ROLE_CONTROLLED);
459 transport->SetTiebreaker(ice_tiebreaker_);
460 // TODO: Connect all the Transport signals to TransportProxy
461 // then to the BaseSession.
462 transport->SignalConnecting.connect(
463 this, &BaseSession::OnTransportConnecting);
464 transport->SignalWritableState.connect(
465 this, &BaseSession::OnTransportWritable);
466 transport->SignalRequestSignaling.connect(
467 this, &BaseSession::OnTransportRequestSignaling);
468 transport->SignalTransportError.connect(
469 this, &BaseSession::OnTransportSendError);
470 transport->SignalRouteChange.connect(
471 this, &BaseSession::OnTransportRouteChange);
472 transport->SignalCandidatesAllocationDone.connect(
473 this, &BaseSession::OnTransportCandidatesAllocationDone);
474 transport->SignalRoleConflict.connect(
475 this, &BaseSession::OnRoleConflict);
476
477 transproxy = new TransportProxy(sid_, content_name,
478 new TransportWrapper(transport));
479 transproxy->SignalCandidatesReady.connect(
480 this, &BaseSession::OnTransportProxyCandidatesReady);
481 transports_[content_name] = transproxy;
482
483 return transproxy;
484}
485
486Transport* BaseSession::GetTransport(const std::string& content_name) {
487 TransportProxy* transproxy = GetTransportProxy(content_name);
488 if (transproxy == NULL)
489 return NULL;
490 return transproxy->impl();
491}
492
493TransportProxy* BaseSession::GetTransportProxy(
494 const std::string& content_name) {
495 TransportMap::iterator iter = transports_.find(content_name);
496 return (iter != transports_.end()) ? iter->second : NULL;
497}
498
499TransportProxy* BaseSession::GetTransportProxy(const Transport* transport) {
500 for (TransportMap::iterator iter = transports_.begin();
501 iter != transports_.end(); ++iter) {
502 TransportProxy* transproxy = iter->second;
503 if (transproxy->impl() == transport) {
504 return transproxy;
505 }
506 }
507 return NULL;
508}
509
510TransportProxy* BaseSession::GetFirstTransportProxy() {
511 if (transports_.empty())
512 return NULL;
513 return transports_.begin()->second;
514}
515
516void BaseSession::DestroyTransportProxy(
517 const std::string& content_name) {
518 TransportMap::iterator iter = transports_.find(content_name);
519 if (iter != transports_.end()) {
520 delete iter->second;
521 transports_.erase(content_name);
522 }
523}
524
525cricket::Transport* BaseSession::CreateTransport(
526 const std::string& content_name) {
527 ASSERT(transport_type_ == NS_GINGLE_P2P);
528 return new cricket::DtlsTransport<P2PTransport>(
529 signaling_thread(), worker_thread(), content_name,
530 port_allocator(), identity_);
531}
532
533bool BaseSession::GetStats(SessionStats* stats) {
534 for (TransportMap::iterator iter = transports_.begin();
535 iter != transports_.end(); ++iter) {
536 std::string proxy_id = iter->second->content_name();
537 // We are ignoring not-yet-instantiated transports.
538 if (iter->second->impl()) {
539 std::string transport_id = iter->second->impl()->content_name();
540 stats->proxy_to_transport[proxy_id] = transport_id;
541 if (stats->transport_stats.find(transport_id)
542 == stats->transport_stats.end()) {
543 TransportStats subinfos;
544 if (!iter->second->impl()->GetStats(&subinfos)) {
545 return false;
546 }
547 stats->transport_stats[transport_id] = subinfos;
548 }
549 }
550 }
551 return true;
552}
553
554void BaseSession::SetState(State state) {
555 ASSERT(signaling_thread_->IsCurrent());
556 if (state != state_) {
557 LogState(state_, state);
558 state_ = state;
559 SignalState(this, state_);
560 signaling_thread_->Post(this, MSG_STATE);
561 }
562 SignalNewDescription();
563}
564
565void BaseSession::SetError(Error error) {
566 ASSERT(signaling_thread_->IsCurrent());
567 if (error != error_) {
568 error_ = error;
569 SignalError(this, error);
570 }
571}
572
573void BaseSession::OnSignalingReady() {
574 ASSERT(signaling_thread()->IsCurrent());
575 for (TransportMap::iterator iter = transports_.begin();
576 iter != transports_.end(); ++iter) {
577 iter->second->OnSignalingReady();
578 }
579}
580
581// TODO(juberti): Since PushdownLocalTD now triggers the connection process to
582// start, remove this method once everyone calls PushdownLocalTD.
583void BaseSession::SpeculativelyConnectAllTransportChannels() {
584 // Put all transports into the connecting state.
585 for (TransportMap::iterator iter = transports_.begin();
586 iter != transports_.end(); ++iter) {
587 iter->second->ConnectChannels();
588 }
589}
590
591bool BaseSession::OnRemoteCandidates(const std::string& content_name,
592 const Candidates& candidates,
593 std::string* error) {
594 // Give candidates to the appropriate transport, and tell that transport
595 // to start connecting, if it's not already doing so.
596 TransportProxy* transproxy = GetTransportProxy(content_name);
597 if (!transproxy) {
598 *error = "Unknown content name " + content_name;
599 return false;
600 }
601 if (!transproxy->OnRemoteCandidates(candidates, error)) {
602 return false;
603 }
604 // TODO(juberti): Remove this call once we can be sure that we always have
605 // a local transport description (which will trigger the connection).
606 transproxy->ConnectChannels();
607 return true;
608}
609
610bool BaseSession::MaybeEnableMuxingSupport() {
611 // We need both a local and remote description to decide if we should mux.
612 if ((state_ == STATE_SENTINITIATE ||
613 state_ == STATE_RECEIVEDINITIATE) &&
614 ((local_description_ == NULL) ||
615 (remote_description_ == NULL))) {
616 return false;
617 }
618
619 // In order to perform the multiplexing, we need all proxies to be in the
620 // negotiated state, i.e. to have implementations underneath.
621 // Ensure that this is the case, regardless of whether we are going to mux.
622 for (TransportMap::iterator iter = transports_.begin();
623 iter != transports_.end(); ++iter) {
624 ASSERT(iter->second->negotiated());
625 if (!iter->second->negotiated())
626 return false;
627 }
628
629 // If both sides agree to BUNDLE, mux all the specified contents onto the
630 // transport belonging to the first content name in the BUNDLE group.
631 // If the contents are already muxed, this will be a no-op.
632 // TODO(juberti): Should this check that local and remote have configured
633 // BUNDLE the same way?
634 bool candidates_allocated = IsCandidateAllocationDone();
635 const ContentGroup* local_bundle_group =
636 local_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
637 const ContentGroup* remote_bundle_group =
638 remote_description()->GetGroupByName(GROUP_TYPE_BUNDLE);
639 if (local_bundle_group && remote_bundle_group &&
640 local_bundle_group->FirstContentName()) {
641 const std::string* content_name = local_bundle_group->FirstContentName();
642 const ContentInfo* content =
643 local_description_->GetContentByName(*content_name);
644 ASSERT(content != NULL);
645 if (!SetSelectedProxy(content->name, local_bundle_group)) {
646 LOG(LS_WARNING) << "Failed to set up BUNDLE";
647 return false;
648 }
649
650 // If we weren't done gathering before, we might be done now, as a result
651 // of enabling mux.
652 LOG(LS_INFO) << "Enabling BUNDLE, bundling onto transport: "
653 << *content_name;
654 if (!candidates_allocated) {
655 MaybeCandidateAllocationDone();
656 }
657 } else {
658 LOG(LS_INFO) << "No BUNDLE information, not bundling.";
659 }
660 return true;
661}
662
663bool BaseSession::SetSelectedProxy(const std::string& content_name,
664 const ContentGroup* muxed_group) {
665 TransportProxy* selected_proxy = GetTransportProxy(content_name);
666 if (!selected_proxy) {
667 return false;
668 }
669
670 ASSERT(selected_proxy->negotiated());
671 for (TransportMap::iterator iter = transports_.begin();
672 iter != transports_.end(); ++iter) {
673 // If content is part of the mux group, then repoint its proxy at the
674 // transport object that we have chosen to mux onto. If the proxy
675 // is already pointing at the right object, it will be a no-op.
676 if (muxed_group->HasContentName(iter->first) &&
677 !iter->second->SetupMux(selected_proxy)) {
678 return false;
679 }
680 }
681 return true;
682}
683
684void BaseSession::OnTransportCandidatesAllocationDone(Transport* transport) {
685 // TODO(juberti): This is a clunky way of processing the done signal. Instead,
686 // TransportProxy should receive the done signal directly, set its allocated
687 // flag internally, and then reissue the done signal to Session.
688 // Overall we should make TransportProxy receive *all* the signals from
689 // Transport, since this removes the need to manually iterate over all
690 // the transports, as is needed to make sure signals are handled properly
691 // when BUNDLEing.
692#if 0
693 ASSERT(!IsCandidateAllocationDone());
694#endif
695 for (TransportMap::iterator iter = transports_.begin();
696 iter != transports_.end(); ++iter) {
697 if (iter->second->impl() == transport) {
698 iter->second->set_candidates_allocated(true);
699 }
700 }
701 MaybeCandidateAllocationDone();
702}
703
704bool BaseSession::IsCandidateAllocationDone() const {
705 for (TransportMap::const_iterator iter = transports_.begin();
706 iter != transports_.end(); ++iter) {
707 if (!iter->second->candidates_allocated())
708 return false;
709 }
710 return true;
711}
712
713void BaseSession::MaybeCandidateAllocationDone() {
714 if (IsCandidateAllocationDone()) {
715 LOG(LS_INFO) << "Candidate gathering is complete.";
716 OnCandidatesAllocationDone();
717 }
718}
719
720void BaseSession::OnRoleConflict() {
721 if (role_switch_) {
722 LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
723 return;
724 }
725
726 role_switch_ = true;
727 for (TransportMap::iterator iter = transports_.begin();
728 iter != transports_.end(); ++iter) {
729 // Role will be reverse of initial role setting.
730 TransportRole role = initiator_ ? ROLE_CONTROLLED : ROLE_CONTROLLING;
731 iter->second->SetRole(role);
732 }
733}
734
735void BaseSession::LogState(State old_state, State new_state) {
736 LOG(LS_INFO) << "Session:" << id()
737 << " Old state:" << StateToString(old_state)
738 << " New state:" << StateToString(new_state)
739 << " Type:" << content_type()
740 << " Transport:" << transport_type();
741}
742
743bool BaseSession::GetTransportDescription(const SessionDescription* description,
744 const std::string& content_name,
745 TransportDescription* tdesc) {
746 if (!description || !tdesc) {
747 return false;
748 }
749 const TransportInfo* transport_info =
750 description->GetTransportInfoByName(content_name);
751 if (!transport_info) {
752 return false;
753 }
754 *tdesc = transport_info->description;
755 return true;
756}
757
758void BaseSession::SignalNewDescription() {
759 ContentAction action;
760 ContentSource source;
761 if (!GetContentAction(&action, &source)) {
762 return;
763 }
764 if (source == CS_LOCAL) {
765 SignalNewLocalDescription(this, action);
766 } else {
767 SignalNewRemoteDescription(this, action);
768 }
769}
770
771bool BaseSession::GetContentAction(ContentAction* action,
772 ContentSource* source) {
773 switch (state_) {
774 // new local description
775 case STATE_SENTINITIATE:
776 *action = CA_OFFER;
777 *source = CS_LOCAL;
778 break;
779 case STATE_SENTPRACCEPT:
780 *action = CA_PRANSWER;
781 *source = CS_LOCAL;
782 break;
783 case STATE_SENTACCEPT:
784 *action = CA_ANSWER;
785 *source = CS_LOCAL;
786 break;
787 // new remote description
788 case STATE_RECEIVEDINITIATE:
789 *action = CA_OFFER;
790 *source = CS_REMOTE;
791 break;
792 case STATE_RECEIVEDPRACCEPT:
793 *action = CA_PRANSWER;
794 *source = CS_REMOTE;
795 break;
796 case STATE_RECEIVEDACCEPT:
797 *action = CA_ANSWER;
798 *source = CS_REMOTE;
799 break;
800 default:
801 return false;
802 }
803 return true;
804}
805
806void BaseSession::OnMessage(talk_base::Message *pmsg) {
807 switch (pmsg->message_id) {
808 case MSG_TIMEOUT:
809 // Session timeout has occured.
810 SetError(ERROR_TIME);
811 break;
812
813 case MSG_STATE:
814 switch (state_) {
815 case STATE_SENTACCEPT:
816 case STATE_RECEIVEDACCEPT:
817 SetState(STATE_INPROGRESS);
818 break;
819
820 default:
821 // Explicitly ignoring some states here.
822 break;
823 }
824 break;
825 }
826}
827
828Session::Session(SessionManager* session_manager,
829 const std::string& local_name,
830 const std::string& initiator_name,
831 const std::string& sid,
832 const std::string& content_type,
833 SessionClient* client)
834 : BaseSession(session_manager->signaling_thread(),
835 session_manager->worker_thread(),
836 session_manager->port_allocator(),
837 sid, content_type, initiator_name == local_name) {
838 ASSERT(client != NULL);
839 session_manager_ = session_manager;
840 local_name_ = local_name;
841 initiator_name_ = initiator_name;
842 transport_parser_ = new P2PTransportParser();
843 client_ = client;
844 initiate_acked_ = false;
845 current_protocol_ = PROTOCOL_HYBRID;
846}
847
848Session::~Session() {
849 delete transport_parser_;
850}
851
852bool Session::Initiate(const std::string &to,
853 const SessionDescription* sdesc) {
854 ASSERT(signaling_thread()->IsCurrent());
855 SessionError error;
856
857 // Only from STATE_INIT
858 if (state() != STATE_INIT)
859 return false;
860
861 // Setup for signaling.
862 set_remote_name(to);
863 set_local_description(sdesc);
864 if (!CreateTransportProxies(GetEmptyTransportInfos(sdesc->contents()),
865 &error)) {
866 LOG(LS_ERROR) << "Could not create transports: " << error.text;
867 return false;
868 }
869
870 if (!SendInitiateMessage(sdesc, &error)) {
871 LOG(LS_ERROR) << "Could not send initiate message: " << error.text;
872 return false;
873 }
874
875 // We need to connect transport proxy and impl here so that we can process
876 // the TransportDescriptions.
877 SpeculativelyConnectAllTransportChannels();
878
879 PushdownTransportDescription(CS_LOCAL, CA_OFFER);
880 SetState(Session::STATE_SENTINITIATE);
881 return true;
882}
883
884bool Session::Accept(const SessionDescription* sdesc) {
885 ASSERT(signaling_thread()->IsCurrent());
886
887 // Only if just received initiate
888 if (state() != STATE_RECEIVEDINITIATE)
889 return false;
890
891 // Setup for signaling.
892 set_local_description(sdesc);
893
894 SessionError error;
895 if (!SendAcceptMessage(sdesc, &error)) {
896 LOG(LS_ERROR) << "Could not send accept message: " << error.text;
897 return false;
898 }
899 // TODO(juberti): Add BUNDLE support to transport-info messages.
900 PushdownTransportDescription(CS_LOCAL, CA_ANSWER);
901 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
902 SetState(Session::STATE_SENTACCEPT);
903 return true;
904}
905
906bool Session::Reject(const std::string& reason) {
907 ASSERT(signaling_thread()->IsCurrent());
908
909 // Reject is sent in response to an initiate or modify, to reject the
910 // request
911 if (state() != STATE_RECEIVEDINITIATE && state() != STATE_RECEIVEDMODIFY)
912 return false;
913
914 SessionError error;
915 if (!SendRejectMessage(reason, &error)) {
916 LOG(LS_ERROR) << "Could not send reject message: " << error.text;
917 return false;
918 }
919
920 SetState(STATE_SENTREJECT);
921 return true;
922}
923
924bool Session::TerminateWithReason(const std::string& reason) {
925 ASSERT(signaling_thread()->IsCurrent());
926
927 // Either side can terminate, at any time.
928 switch (state()) {
929 case STATE_SENTTERMINATE:
930 case STATE_RECEIVEDTERMINATE:
931 return false;
932
933 case STATE_SENTREJECT:
934 case STATE_RECEIVEDREJECT:
935 // We don't need to send terminate if we sent or received a reject...
936 // it's implicit.
937 break;
938
939 default:
940 SessionError error;
941 if (!SendTerminateMessage(reason, &error)) {
942 LOG(LS_ERROR) << "Could not send terminate message: " << error.text;
943 return false;
944 }
945 break;
946 }
947
948 SetState(STATE_SENTTERMINATE);
949 return true;
950}
951
952bool Session::SendInfoMessage(const XmlElements& elems) {
953 ASSERT(signaling_thread()->IsCurrent());
954 SessionError error;
955 if (!SendMessage(ACTION_SESSION_INFO, elems, &error)) {
956 LOG(LS_ERROR) << "Could not send info message " << error.text;
957 return false;
958 }
959 return true;
960}
961
962bool Session::SendDescriptionInfoMessage(const ContentInfos& contents) {
963 XmlElements elems;
964 WriteError write_error;
965 if (!WriteDescriptionInfo(current_protocol_,
966 contents,
967 GetContentParsers(),
968 &elems, &write_error)) {
969 LOG(LS_ERROR) << "Could not write description info message: "
970 << write_error.text;
971 return false;
972 }
973 SessionError error;
974 if (!SendMessage(ACTION_DESCRIPTION_INFO, elems, &error)) {
975 LOG(LS_ERROR) << "Could not send description info message: "
976 << error.text;
977 return false;
978 }
979 return true;
980}
981
982TransportInfos Session::GetEmptyTransportInfos(
983 const ContentInfos& contents) const {
984 TransportInfos tinfos;
985 for (ContentInfos::const_iterator content = contents.begin();
986 content != contents.end(); ++content) {
987 tinfos.push_back(
988 TransportInfo(content->name,
989 TransportDescription(transport_type(), Candidates())));
990 }
991 return tinfos;
992}
993
994bool Session::OnRemoteCandidates(
995 const TransportInfos& tinfos, ParseError* error) {
996 for (TransportInfos::const_iterator tinfo = tinfos.begin();
997 tinfo != tinfos.end(); ++tinfo) {
998 std::string str_error;
999 if (!BaseSession::OnRemoteCandidates(
1000 tinfo->content_name, tinfo->description.candidates, &str_error)) {
1001 return BadParse(str_error, error);
1002 }
1003 }
1004 return true;
1005}
1006
1007bool Session::CreateTransportProxies(const TransportInfos& tinfos,
1008 SessionError* error) {
1009 for (TransportInfos::const_iterator tinfo = tinfos.begin();
1010 tinfo != tinfos.end(); ++tinfo) {
1011 if (tinfo->description.transport_type != transport_type()) {
1012 error->SetText("No supported transport in offer.");
1013 return false;
1014 }
1015
1016 GetOrCreateTransportProxy(tinfo->content_name);
1017 }
1018 return true;
1019}
1020
1021TransportParserMap Session::GetTransportParsers() {
1022 TransportParserMap parsers;
1023 parsers[transport_type()] = transport_parser_;
1024 return parsers;
1025}
1026
1027CandidateTranslatorMap Session::GetCandidateTranslators() {
1028 CandidateTranslatorMap translators;
1029 // NOTE: This technique makes it impossible to parse G-ICE
1030 // candidates in session-initiate messages because the channels
1031 // aren't yet created at that point. Since we don't use candidates
1032 // in session-initiate messages, we should be OK. Once we switch to
1033 // ICE, this translation shouldn't be necessary.
1034 for (TransportMap::const_iterator iter = transport_proxies().begin();
1035 iter != transport_proxies().end(); ++iter) {
1036 translators[iter->first] = iter->second;
1037 }
1038 return translators;
1039}
1040
1041ContentParserMap Session::GetContentParsers() {
1042 ContentParserMap parsers;
1043 parsers[content_type()] = client_;
1044 // We need to be able parse both RTP-based and SCTP-based Jingle
1045 // with the same client.
1046 if (content_type() == NS_JINGLE_RTP) {
1047 parsers[NS_JINGLE_DRAFT_SCTP] = client_;
1048 }
1049 return parsers;
1050}
1051
1052void Session::OnTransportRequestSignaling(Transport* transport) {
1053 ASSERT(signaling_thread()->IsCurrent());
1054 TransportProxy* transproxy = GetTransportProxy(transport);
1055 ASSERT(transproxy != NULL);
1056 if (transproxy) {
1057 // Reset candidate allocation status for the transport proxy.
1058 transproxy->set_candidates_allocated(false);
1059 }
1060 SignalRequestSignaling(this);
1061}
1062
1063void Session::OnTransportConnecting(Transport* transport) {
1064 // This is an indication that we should begin watching the writability
1065 // state of the transport.
1066 OnTransportWritable(transport);
1067}
1068
1069void Session::OnTransportWritable(Transport* transport) {
1070 ASSERT(signaling_thread()->IsCurrent());
1071
1072 // If the transport is not writable, start a timer to make sure that it
1073 // becomes writable within a reasonable amount of time. If it does not, we
1074 // terminate since we can't actually send data. If the transport is writable,
1075 // cancel the timer. Note that writability transitions may occur repeatedly
1076 // during the lifetime of the session.
1077 signaling_thread()->Clear(this, MSG_TIMEOUT);
1078 if (transport->HasChannels() && !transport->writable()) {
1079 signaling_thread()->PostDelayed(
1080 session_manager_->session_timeout() * 1000, this, MSG_TIMEOUT);
1081 }
1082}
1083
1084void Session::OnTransportProxyCandidatesReady(TransportProxy* transproxy,
1085 const Candidates& candidates) {
1086 ASSERT(signaling_thread()->IsCurrent());
1087 if (transproxy != NULL) {
1088 if (initiator() && !initiate_acked_) {
1089 // TODO: This is to work around server re-ordering
1090 // messages. We send the candidates once the session-initiate
1091 // is acked. Once we have fixed the server to guarantee message
1092 // order, we can remove this case.
1093 transproxy->AddUnsentCandidates(candidates);
1094 } else {
1095 if (!transproxy->negotiated()) {
1096 transproxy->AddSentCandidates(candidates);
1097 }
1098 SessionError error;
1099 if (!SendTransportInfoMessage(transproxy, candidates, &error)) {
1100 LOG(LS_ERROR) << "Could not send transport info message: "
1101 << error.text;
1102 return;
1103 }
1104 }
1105 }
1106}
1107
1108void Session::OnTransportSendError(Transport* transport,
1109 const buzz::XmlElement* stanza,
1110 const buzz::QName& name,
1111 const std::string& type,
1112 const std::string& text,
1113 const buzz::XmlElement* extra_info) {
1114 ASSERT(signaling_thread()->IsCurrent());
1115 SignalErrorMessage(this, stanza, name, type, text, extra_info);
1116}
1117
1118void Session::OnIncomingMessage(const SessionMessage& msg) {
1119 ASSERT(signaling_thread()->IsCurrent());
1120 ASSERT(state() == STATE_INIT || msg.from == remote_name());
1121
1122 if (current_protocol_== PROTOCOL_HYBRID) {
1123 if (msg.protocol == PROTOCOL_GINGLE) {
1124 current_protocol_ = PROTOCOL_GINGLE;
1125 } else {
1126 current_protocol_ = PROTOCOL_JINGLE;
1127 }
1128 }
1129
1130 bool valid = false;
1131 MessageError error;
1132 switch (msg.type) {
1133 case ACTION_SESSION_INITIATE:
1134 valid = OnInitiateMessage(msg, &error);
1135 break;
1136 case ACTION_SESSION_INFO:
1137 valid = OnInfoMessage(msg);
1138 break;
1139 case ACTION_SESSION_ACCEPT:
1140 valid = OnAcceptMessage(msg, &error);
1141 break;
1142 case ACTION_SESSION_REJECT:
1143 valid = OnRejectMessage(msg, &error);
1144 break;
1145 case ACTION_SESSION_TERMINATE:
1146 valid = OnTerminateMessage(msg, &error);
1147 break;
1148 case ACTION_TRANSPORT_INFO:
1149 valid = OnTransportInfoMessage(msg, &error);
1150 break;
1151 case ACTION_TRANSPORT_ACCEPT:
1152 valid = OnTransportAcceptMessage(msg, &error);
1153 break;
1154 case ACTION_DESCRIPTION_INFO:
1155 valid = OnDescriptionInfoMessage(msg, &error);
1156 break;
1157 default:
1158 valid = BadMessage(buzz::QN_STANZA_BAD_REQUEST,
1159 "unknown session message type",
1160 &error);
1161 }
1162
1163 if (valid) {
1164 SendAcknowledgementMessage(msg.stanza);
1165 } else {
1166 SignalErrorMessage(this, msg.stanza, error.type,
1167 "modify", error.text, NULL);
1168 }
1169}
1170
1171void Session::OnIncomingResponse(const buzz::XmlElement* orig_stanza,
1172 const buzz::XmlElement* response_stanza,
1173 const SessionMessage& msg) {
1174 ASSERT(signaling_thread()->IsCurrent());
1175
1176 if (msg.type == ACTION_SESSION_INITIATE) {
1177 OnInitiateAcked();
1178 }
1179}
1180
1181void Session::OnInitiateAcked() {
1182 // TODO: This is to work around server re-ordering
1183 // messages. We send the candidates once the session-initiate
1184 // is acked. Once we have fixed the server to guarantee message
1185 // order, we can remove this case.
1186 if (!initiate_acked_) {
1187 initiate_acked_ = true;
1188 SessionError error;
1189 SendAllUnsentTransportInfoMessages(&error);
1190 }
1191}
1192
1193void Session::OnFailedSend(const buzz::XmlElement* orig_stanza,
1194 const buzz::XmlElement* error_stanza) {
1195 ASSERT(signaling_thread()->IsCurrent());
1196
1197 SessionMessage msg;
1198 ParseError parse_error;
1199 if (!ParseSessionMessage(orig_stanza, &msg, &parse_error)) {
1200 LOG(LS_ERROR) << "Error parsing failed send: " << parse_error.text
1201 << ":" << orig_stanza;
1202 return;
1203 }
1204
1205 // If the error is a session redirect, call OnRedirectError, which will
1206 // continue the session with a new remote JID.
1207 SessionRedirect redirect;
1208 if (FindSessionRedirect(error_stanza, &redirect)) {
1209 SessionError error;
1210 if (!OnRedirectError(redirect, &error)) {
1211 // TODO: Should we send a message back? The standard
1212 // says nothing about it.
1213 LOG(LS_ERROR) << "Failed to redirect: " << error.text;
1214 SetError(ERROR_RESPONSE);
1215 }
1216 return;
1217 }
1218
1219 std::string error_type = "cancel";
1220
1221 const buzz::XmlElement* error = error_stanza->FirstNamed(buzz::QN_ERROR);
1222 if (error) {
1223 error_type = error->Attr(buzz::QN_TYPE);
1224
1225 LOG(LS_ERROR) << "Session error:\n" << error->Str() << "\n"
1226 << "in response to:\n" << orig_stanza->Str();
1227 } else {
1228 // don't crash if <error> is missing
1229 LOG(LS_ERROR) << "Session error without <error/> element, ignoring";
1230 return;
1231 }
1232
1233 if (msg.type == ACTION_TRANSPORT_INFO) {
1234 // Transport messages frequently generate errors because they are sent right
1235 // when we detect a network failure. For that reason, we ignore such
1236 // errors, because if we do not establish writability again, we will
1237 // terminate anyway. The exceptions are transport-specific error tags,
1238 // which we pass on to the respective transport.
1239 } else if ((error_type != "continue") && (error_type != "wait")) {
1240 // We do not set an error if the other side said it is okay to continue
1241 // (possibly after waiting). These errors can be ignored.
1242 SetError(ERROR_RESPONSE);
1243 }
1244}
1245
1246bool Session::OnInitiateMessage(const SessionMessage& msg,
1247 MessageError* error) {
1248 if (!CheckState(STATE_INIT, error))
1249 return false;
1250
1251 SessionInitiate init;
1252 if (!ParseSessionInitiate(msg.protocol, msg.action_elem,
1253 GetContentParsers(), GetTransportParsers(),
1254 GetCandidateTranslators(),
1255 &init, error))
1256 return false;
1257
1258 SessionError session_error;
1259 if (!CreateTransportProxies(init.transports, &session_error)) {
1260 return BadMessage(buzz::QN_STANZA_NOT_ACCEPTABLE,
1261 session_error.text, error);
1262 }
1263
1264 set_remote_name(msg.from);
1265 set_initiator_name(msg.initiator);
1266 set_remote_description(new SessionDescription(init.ClearContents(),
1267 init.transports,
1268 init.groups));
1269 // Updating transport with TransportDescription.
1270 PushdownTransportDescription(CS_REMOTE, CA_OFFER);
1271 SetState(STATE_RECEIVEDINITIATE);
1272
1273 // Users of Session may listen to state change and call Reject().
1274 if (state() != STATE_SENTREJECT) {
1275 if (!OnRemoteCandidates(init.transports, error))
1276 return false;
1277
1278 // TODO(juberti): Auto-generate and push down the local transport answer.
1279 // This is necessary for trickling to work with RFC 5245 ICE.
1280 }
1281 return true;
1282}
1283
1284bool Session::OnAcceptMessage(const SessionMessage& msg, MessageError* error) {
1285 if (!CheckState(STATE_SENTINITIATE, error))
1286 return false;
1287
1288 SessionAccept accept;
1289 if (!ParseSessionAccept(msg.protocol, msg.action_elem,
1290 GetContentParsers(), GetTransportParsers(),
1291 GetCandidateTranslators(),
1292 &accept, error)) {
1293 return false;
1294 }
1295
1296 // If we get an accept, we can assume the initiate has been
1297 // received, even if we haven't gotten an IQ response.
1298 OnInitiateAcked();
1299
1300 set_remote_description(new SessionDescription(accept.ClearContents(),
1301 accept.transports,
1302 accept.groups));
1303 // Updating transport with TransportDescription.
1304 PushdownTransportDescription(CS_REMOTE, CA_ANSWER);
1305 MaybeEnableMuxingSupport(); // Enable transport channel mux if supported.
1306 SetState(STATE_RECEIVEDACCEPT);
1307
1308 if (!OnRemoteCandidates(accept.transports, error))
1309 return false;
1310
1311 return true;
1312}
1313
1314bool Session::OnRejectMessage(const SessionMessage& msg, MessageError* error) {
1315 if (!CheckState(STATE_SENTINITIATE, error))
1316 return false;
1317
1318 SetState(STATE_RECEIVEDREJECT);
1319 return true;
1320}
1321
1322bool Session::OnInfoMessage(const SessionMessage& msg) {
1323 SignalInfoMessage(this, msg.action_elem);
1324 return true;
1325}
1326
1327bool Session::OnTerminateMessage(const SessionMessage& msg,
1328 MessageError* error) {
1329 SessionTerminate term;
1330 if (!ParseSessionTerminate(msg.protocol, msg.action_elem, &term, error))
1331 return false;
1332
1333 SignalReceivedTerminateReason(this, term.reason);
1334 if (term.debug_reason != buzz::STR_EMPTY) {
1335 LOG(LS_VERBOSE) << "Received error on call: " << term.debug_reason;
1336 }
1337
1338 SetState(STATE_RECEIVEDTERMINATE);
1339 return true;
1340}
1341
1342bool Session::OnTransportInfoMessage(const SessionMessage& msg,
1343 MessageError* error) {
1344 TransportInfos tinfos;
1345 if (!ParseTransportInfos(msg.protocol, msg.action_elem,
1346 initiator_description()->contents(),
1347 GetTransportParsers(), GetCandidateTranslators(),
1348 &tinfos, error))
1349 return false;
1350
1351 if (!OnRemoteCandidates(tinfos, error))
1352 return false;
1353
1354 return true;
1355}
1356
1357bool Session::OnTransportAcceptMessage(const SessionMessage& msg,
1358 MessageError* error) {
1359 // TODO: Currently here only for compatibility with
1360 // Gingle 1.1 clients (notably, Google Voice).
1361 return true;
1362}
1363
1364bool Session::OnDescriptionInfoMessage(const SessionMessage& msg,
1365 MessageError* error) {
1366 if (!CheckState(STATE_INPROGRESS, error))
1367 return false;
1368
1369 DescriptionInfo description_info;
1370 if (!ParseDescriptionInfo(msg.protocol, msg.action_elem,
1371 GetContentParsers(), GetTransportParsers(),
1372 GetCandidateTranslators(),
1373 &description_info, error)) {
1374 return false;
1375 }
1376
henrike@webrtc.org1e09a712013-07-26 19:17:59 +00001377 ContentInfos& updated_contents = description_info.contents;
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001378
1379 // TODO: Currently, reflector sends back
1380 // video stream updates even for an audio-only call, which causes
1381 // this to fail. Put this back once reflector is fixed.
1382 //
1383 // ContentInfos::iterator it;
1384 // First, ensure all updates are valid before modifying remote_description_.
1385 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1386 // if (remote_description()->GetContentByName(it->name) == NULL) {
1387 // return false;
1388 // }
1389 // }
1390
1391 // TODO: We used to replace contents from an update, but
1392 // that no longer works with partial updates. We need to figure out
1393 // a way to merge patial updates into contents. For now, users of
1394 // Session should listen to SignalRemoteDescriptionUpdate and handle
1395 // updates. They should not expect remote_description to be the
1396 // latest value.
1397 //
1398 // for (it = updated_contents.begin(); it != updated_contents.end(); ++it) {
1399 // remote_description()->RemoveContentByName(it->name);
1400 // remote_description()->AddContent(it->name, it->type, it->description);
1401 // }
1402 // }
1403
1404 SignalRemoteDescriptionUpdate(this, updated_contents);
1405 return true;
1406}
1407
1408bool BareJidsEqual(const std::string& name1,
1409 const std::string& name2) {
1410 buzz::Jid jid1(name1);
1411 buzz::Jid jid2(name2);
1412
1413 return jid1.IsValid() && jid2.IsValid() && jid1.BareEquals(jid2);
1414}
1415
1416bool Session::OnRedirectError(const SessionRedirect& redirect,
1417 SessionError* error) {
1418 MessageError message_error;
1419 if (!CheckState(STATE_SENTINITIATE, &message_error)) {
1420 return BadWrite(message_error.text, error);
1421 }
1422
1423 if (!BareJidsEqual(remote_name(), redirect.target))
1424 return BadWrite("Redirection not allowed: must be the same bare jid.",
1425 error);
1426
1427 // When we receive a redirect, we point the session at the new JID
1428 // and resend the candidates.
1429 set_remote_name(redirect.target);
1430 return (SendInitiateMessage(local_description(), error) &&
1431 ResendAllTransportInfoMessages(error));
1432}
1433
1434bool Session::CheckState(State expected, MessageError* error) {
1435 if (state() != expected) {
1436 // The server can deliver messages out of order/repeated for various
1437 // reasons. For example, if the server does not recive our iq response,
1438 // it could assume that the iq it sent was lost, and will then send
1439 // it again. Ideally, we should implement reliable messaging with
1440 // duplicate elimination.
1441 return BadMessage(buzz::QN_STANZA_NOT_ALLOWED,
1442 "message not allowed in current state",
1443 error);
1444 }
1445 return true;
1446}
1447
1448void Session::SetError(Error error) {
1449 BaseSession::SetError(error);
1450 if (error != ERROR_NONE)
1451 signaling_thread()->Post(this, MSG_ERROR);
1452}
1453
1454void Session::OnMessage(talk_base::Message* pmsg) {
1455 // preserve this because BaseSession::OnMessage may modify it
1456 State orig_state = state();
1457
1458 BaseSession::OnMessage(pmsg);
1459
1460 switch (pmsg->message_id) {
1461 case MSG_ERROR:
1462 TerminateWithReason(STR_TERMINATE_ERROR);
1463 break;
1464
1465 case MSG_STATE:
1466 switch (orig_state) {
1467 case STATE_SENTREJECT:
1468 case STATE_RECEIVEDREJECT:
1469 // Assume clean termination.
1470 Terminate();
1471 break;
1472
1473 case STATE_SENTTERMINATE:
1474 case STATE_RECEIVEDTERMINATE:
1475 session_manager_->DestroySession(this);
1476 break;
1477
1478 default:
1479 // Explicitly ignoring some states here.
1480 break;
1481 }
1482 break;
1483 }
1484}
1485
1486bool Session::SendInitiateMessage(const SessionDescription* sdesc,
1487 SessionError* error) {
1488 SessionInitiate init;
1489 init.contents = sdesc->contents();
1490 init.transports = GetEmptyTransportInfos(init.contents);
1491 init.groups = sdesc->groups();
1492 return SendMessage(ACTION_SESSION_INITIATE, init, error);
1493}
1494
1495bool Session::WriteSessionAction(
1496 SignalingProtocol protocol, const SessionInitiate& init,
1497 XmlElements* elems, WriteError* error) {
1498 return WriteSessionInitiate(protocol, init.contents, init.transports,
1499 GetContentParsers(), GetTransportParsers(),
1500 GetCandidateTranslators(), init.groups,
1501 elems, error);
1502}
1503
1504bool Session::SendAcceptMessage(const SessionDescription* sdesc,
1505 SessionError* error) {
1506 XmlElements elems;
1507 if (!WriteSessionAccept(current_protocol_,
1508 sdesc->contents(),
1509 GetEmptyTransportInfos(sdesc->contents()),
1510 GetContentParsers(), GetTransportParsers(),
1511 GetCandidateTranslators(), sdesc->groups(),
1512 &elems, error)) {
1513 return false;
1514 }
1515 return SendMessage(ACTION_SESSION_ACCEPT, elems, error);
1516}
1517
1518bool Session::SendRejectMessage(const std::string& reason,
1519 SessionError* error) {
1520 SessionTerminate term(reason);
1521 return SendMessage(ACTION_SESSION_REJECT, term, error);
1522}
1523
1524bool Session::SendTerminateMessage(const std::string& reason,
1525 SessionError* error) {
1526 SessionTerminate term(reason);
1527 return SendMessage(ACTION_SESSION_TERMINATE, term, error);
1528}
1529
1530bool Session::WriteSessionAction(SignalingProtocol protocol,
1531 const SessionTerminate& term,
1532 XmlElements* elems, WriteError* error) {
1533 WriteSessionTerminate(protocol, term, elems);
1534 return true;
1535}
1536
1537bool Session::SendTransportInfoMessage(const TransportInfo& tinfo,
1538 SessionError* error) {
1539 return SendMessage(ACTION_TRANSPORT_INFO, tinfo, error);
1540}
1541
1542bool Session::SendTransportInfoMessage(const TransportProxy* transproxy,
1543 const Candidates& candidates,
1544 SessionError* error) {
1545 return SendTransportInfoMessage(TransportInfo(transproxy->content_name(),
1546 TransportDescription(transproxy->type(), candidates)), error);
1547}
1548
1549bool Session::WriteSessionAction(SignalingProtocol protocol,
1550 const TransportInfo& tinfo,
1551 XmlElements* elems, WriteError* error) {
1552 TransportInfos tinfos;
1553 tinfos.push_back(tinfo);
1554 return WriteTransportInfos(protocol, tinfos,
1555 GetTransportParsers(), GetCandidateTranslators(),
1556 elems, error);
1557}
1558
1559bool Session::ResendAllTransportInfoMessages(SessionError* error) {
1560 for (TransportMap::const_iterator iter = transport_proxies().begin();
1561 iter != transport_proxies().end(); ++iter) {
1562 TransportProxy* transproxy = iter->second;
1563 if (transproxy->sent_candidates().size() > 0) {
1564 if (!SendTransportInfoMessage(
1565 transproxy, transproxy->sent_candidates(), error)) {
1566 LOG(LS_ERROR) << "Could not resend transport info messages: "
1567 << error->text;
1568 return false;
1569 }
1570 transproxy->ClearSentCandidates();
1571 }
1572 }
1573 return true;
1574}
1575
1576bool Session::SendAllUnsentTransportInfoMessages(SessionError* error) {
1577 for (TransportMap::const_iterator iter = transport_proxies().begin();
1578 iter != transport_proxies().end(); ++iter) {
1579 TransportProxy* transproxy = iter->second;
1580 if (transproxy->unsent_candidates().size() > 0) {
1581 if (!SendTransportInfoMessage(
1582 transproxy, transproxy->unsent_candidates(), error)) {
1583 LOG(LS_ERROR) << "Could not send unsent transport info messages: "
1584 << error->text;
1585 return false;
1586 }
1587 transproxy->ClearUnsentCandidates();
1588 }
1589 }
1590 return true;
1591}
1592
1593bool Session::SendMessage(ActionType type, const XmlElements& action_elems,
1594 SessionError* error) {
1595 talk_base::scoped_ptr<buzz::XmlElement> stanza(
1596 new buzz::XmlElement(buzz::QN_IQ));
1597
1598 SessionMessage msg(current_protocol_, type, id(), initiator_name());
1599 msg.to = remote_name();
1600 WriteSessionMessage(msg, action_elems, stanza.get());
1601
1602 SignalOutgoingMessage(this, stanza.get());
1603 return true;
1604}
1605
1606template <typename Action>
1607bool Session::SendMessage(ActionType type, const Action& action,
1608 SessionError* error) {
1609 talk_base::scoped_ptr<buzz::XmlElement> stanza(
1610 new buzz::XmlElement(buzz::QN_IQ));
1611 if (!WriteActionMessage(type, action, stanza.get(), error))
1612 return false;
1613
1614 SignalOutgoingMessage(this, stanza.get());
1615 return true;
1616}
1617
1618template <typename Action>
1619bool Session::WriteActionMessage(ActionType type, const Action& action,
1620 buzz::XmlElement* stanza,
1621 WriteError* error) {
1622 if (current_protocol_ == PROTOCOL_HYBRID) {
1623 if (!WriteActionMessage(PROTOCOL_JINGLE, type, action, stanza, error))
1624 return false;
1625 if (!WriteActionMessage(PROTOCOL_GINGLE, type, action, stanza, error))
1626 return false;
1627 } else {
1628 if (!WriteActionMessage(current_protocol_, type, action, stanza, error))
1629 return false;
1630 }
1631 return true;
1632}
1633
1634template <typename Action>
1635bool Session::WriteActionMessage(SignalingProtocol protocol,
1636 ActionType type, const Action& action,
1637 buzz::XmlElement* stanza, WriteError* error) {
1638 XmlElements action_elems;
1639 if (!WriteSessionAction(protocol, action, &action_elems, error))
1640 return false;
1641
1642 SessionMessage msg(protocol, type, id(), initiator_name());
1643 msg.to = remote_name();
1644
1645 WriteSessionMessage(msg, action_elems, stanza);
1646 return true;
1647}
1648
1649void Session::SendAcknowledgementMessage(const buzz::XmlElement* stanza) {
1650 talk_base::scoped_ptr<buzz::XmlElement> ack(
1651 new buzz::XmlElement(buzz::QN_IQ));
1652 ack->SetAttr(buzz::QN_TO, remote_name());
1653 ack->SetAttr(buzz::QN_ID, stanza->Attr(buzz::QN_ID));
1654 ack->SetAttr(buzz::QN_TYPE, "result");
1655
1656 SignalOutgoingMessage(this, ack.get());
1657}
1658
1659} // namespace cricket