blob: 2b45845756fd2bfa790b8baabd56e96ae713ac1e [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/webrtcsession.h"
32#include "talk/base/logging.h"
33#include "talk/base/refcount.h"
34
35namespace webrtc {
36
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000037static size_t kMaxQueuedReceivedDataPackets = 100;
38static size_t kMaxQueuedSendDataPackets = 100;
henrike@webrtc.org28e20752013-07-10 00:45:36 +000039
40talk_base::scoped_refptr<DataChannel> DataChannel::Create(
41 WebRtcSession* session,
42 const std::string& label,
43 const DataChannelInit* config) {
44 talk_base::scoped_refptr<DataChannel> channel(
45 new talk_base::RefCountedObject<DataChannel>(session, label));
46 if (!channel->Init(config)) {
47 return NULL;
48 }
49 return channel;
50}
51
52DataChannel::DataChannel(WebRtcSession* session, const std::string& label)
53 : label_(label),
54 observer_(NULL),
55 state_(kConnecting),
56 was_ever_writable_(false),
57 session_(session),
58 data_session_(NULL),
59 send_ssrc_set_(false),
60 send_ssrc_(0),
61 receive_ssrc_set_(false),
62 receive_ssrc_(0) {
63}
64
65bool DataChannel::Init(const DataChannelInit* config) {
66 if (config) {
67 if (session_->data_channel_type() == cricket::DCT_RTP &&
68 (config->reliable ||
69 config->id != -1 ||
70 config->maxRetransmits != -1 ||
71 config->maxRetransmitTime != -1)) {
72 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
73 << "invalid DataChannelInit.";
74 return false;
75 } else if (session_->data_channel_type() == cricket::DCT_SCTP) {
76 if (config->id < -1 ||
77 config->maxRetransmits < -1 ||
78 config->maxRetransmitTime < -1) {
79 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
80 << "invalid DataChannelInit.";
81 return false;
82 }
83 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
84 LOG(LS_ERROR) <<
85 "maxRetransmits and maxRetransmitTime should not be both set.";
86 return false;
87 }
88 }
89 config_ = *config;
90 }
91 return true;
92}
93
94bool DataChannel::HasNegotiationCompleted() {
95 return send_ssrc_set_ == receive_ssrc_set_;
96}
97
98DataChannel::~DataChannel() {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000099 ClearQueuedReceivedData();
100 ClearQueuedSendData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000101}
102
103void DataChannel::RegisterObserver(DataChannelObserver* observer) {
104 observer_ = observer;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000105 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000106}
107
108void DataChannel::UnregisterObserver() {
109 observer_ = NULL;
110}
111
112bool DataChannel::reliable() const {
113 if (session_->data_channel_type() == cricket::DCT_RTP) {
114 return false;
115 } else {
116 return config_.maxRetransmits == -1 &&
117 config_.maxRetransmitTime == -1;
118 }
119}
120
121uint64 DataChannel::buffered_amount() const {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000122 uint64 buffered_amount = 0;
123 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
124 it != queued_send_data_.end();
125 ++it) {
126 buffered_amount += (*it)->size();
127 }
128 return buffered_amount;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000129}
130
131void DataChannel::Close() {
132 if (state_ == kClosed)
133 return;
134 send_ssrc_ = 0;
135 send_ssrc_set_ = false;
136 SetState(kClosing);
137 UpdateState();
138}
139
140bool DataChannel::Send(const DataBuffer& buffer) {
141 if (state_ != kOpen) {
142 return false;
143 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000144 // If the queue is non-empty, we're waiting for SignalReadyToSend,
145 // so just add to the end of the queue and keep waiting.
146 if (!queued_send_data_.empty()) {
147 return QueueSendData(buffer);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000148 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000149
150 cricket::SendDataResult send_result;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000151 if (!InternalSendWithoutQueueing(buffer, &send_result)) {
152 if (send_result == cricket::SDR_BLOCK) {
153 return QueueSendData(buffer);
154 }
155 // Fail for other results.
156 // TODO(jiayl): We should close the data channel in this case.
157 return false;
158 }
159 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000160}
161
wu@webrtc.org91053e72013-08-10 07:18:04 +0000162void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
163 queued_control_data_.push(buffer);
164}
165
166bool DataChannel::SendControl(const talk_base::Buffer* buffer) {
167 if (state_ != kOpen) {
168 QueueControl(buffer);
169 return true;
170 }
171 if (session_->data_channel_type() == cricket::DCT_RTP) {
172 delete buffer;
173 return false;
174 }
175
176 cricket::SendDataParams send_params;
177 send_params.ssrc = config_.id;
178 send_params.ordered = true;
179 send_params.type = cricket::DMT_CONTROL;
180
181 cricket::SendDataResult send_result;
182 bool retval = session_->data_channel()->SendData(
183 send_params, *buffer, &send_result);
184 if (!retval && send_result == cricket::SDR_BLOCK) {
185 // Link is congested. Queue for later.
186 QueueControl(buffer);
187 } else {
188 delete buffer;
189 }
190 return retval;
191}
192
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000193void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
194 if (receive_ssrc_set_) {
195 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
196 receive_ssrc_ == send_ssrc_);
197 return;
198 }
199 receive_ssrc_ = receive_ssrc;
200 receive_ssrc_set_ = true;
201 UpdateState();
202}
203
204// The remote peer request that this channel shall be closed.
205void DataChannel::RemotePeerRequestClose() {
206 DoClose();
207}
208
209void DataChannel::SetSendSsrc(uint32 send_ssrc) {
210 if (send_ssrc_set_) {
211 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
212 receive_ssrc_ == send_ssrc_);
213 return;
214 }
215 send_ssrc_ = send_ssrc;
216 send_ssrc_set_ = true;
217 UpdateState();
218}
219
220// The underlaying data engine is closing.
221// This function make sure the DataChannel is disconneced and change state to
222// kClosed.
223void DataChannel::OnDataEngineClose() {
224 DoClose();
225}
226
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000227void DataChannel::OnDataReceived(cricket::DataChannel* channel,
228 const cricket::ReceiveDataParams& params,
229 const talk_base::Buffer& payload) {
230 if (params.ssrc != receive_ssrc_) {
231 return;
232 }
233
234 bool binary = (params.type == cricket::DMT_BINARY);
235 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
236 if (was_ever_writable_ && observer_) {
237 observer_->OnMessage(*buffer.get());
238 } else {
239 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
240 // TODO(jiayl): We should close the data channel in this case.
241 LOG(LS_ERROR)
242 << "Queued received data exceeds the max number of packes.";
243 ClearQueuedReceivedData();
244 }
245 queued_received_data_.push(buffer.release());
246 }
247}
248
249void DataChannel::OnChannelReady(bool writable) {
250 if (!writable) {
251 return;
252 }
253 // Update the readyState if the channel is writable for the first time;
254 // otherwise it means the channel was blocked for sending and now unblocked,
255 // so send the queued data now.
256 if (!was_ever_writable_) {
257 was_ever_writable_ = true;
258 UpdateState();
259 } else if (state_ == kOpen) {
260 SendQueuedSendData();
261 }
262}
263
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000264void DataChannel::DoClose() {
265 receive_ssrc_set_ = false;
266 send_ssrc_set_ = false;
267 SetState(kClosing);
268 UpdateState();
269}
270
271void DataChannel::UpdateState() {
272 switch (state_) {
273 case kConnecting: {
274 if (HasNegotiationCompleted()) {
275 if (!IsConnectedToDataSession()) {
276 ConnectToDataSession();
277 }
278 if (was_ever_writable_) {
279 SetState(kOpen);
280 // If we have received buffers before the channel got writable.
281 // Deliver them now.
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000282 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000283 }
284 }
285 break;
286 }
287 case kOpen: {
288 break;
289 }
290 case kClosing: {
291 if (IsConnectedToDataSession()) {
292 DisconnectFromDataSession();
293 }
294 if (HasNegotiationCompleted()) {
295 SetState(kClosed);
296 }
297 break;
298 }
299 case kClosed:
300 break;
301 }
302}
303
304void DataChannel::SetState(DataState state) {
305 state_ = state;
306 if (observer_) {
307 observer_->OnStateChange();
308 }
309}
310
311void DataChannel::ConnectToDataSession() {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000312 if (!session_->data_channel()) {
313 LOG(LS_ERROR) << "The DataEngine does not exist.";
wu@webrtc.org91053e72013-08-10 07:18:04 +0000314 ASSERT(session_->data_channel() != NULL);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000315 return;
316 }
317
318 data_session_ = session_->data_channel();
319 data_session_->SignalReadyToSendData.connect(this,
320 &DataChannel::OnChannelReady);
321 data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived);
wu@webrtc.org91053e72013-08-10 07:18:04 +0000322 cricket::StreamParams params =
323 cricket::StreamParams::CreateLegacy(id());
324 data_session_->media_channel()->AddSendStream(params);
325 data_session_->media_channel()->AddRecvStream(params);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000326}
327
328void DataChannel::DisconnectFromDataSession() {
wu@webrtc.org91053e72013-08-10 07:18:04 +0000329 if (data_session_->media_channel() != NULL) {
330 data_session_->media_channel()->RemoveSendStream(id());
331 data_session_->media_channel()->RemoveRecvStream(id());
332 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000333 data_session_->SignalReadyToSendData.disconnect(this);
334 data_session_->SignalDataReceived.disconnect(this);
335 data_session_ = NULL;
336}
337
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000338void DataChannel::DeliverQueuedReceivedData() {
339 if (!was_ever_writable_ || !observer_) {
340 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000341 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000342
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000343 while (!queued_received_data_.empty()) {
344 DataBuffer* buffer = queued_received_data_.front();
345 observer_->OnMessage(*buffer);
346 queued_received_data_.pop();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000347 delete buffer;
348 }
349}
350
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000351void DataChannel::ClearQueuedReceivedData() {
352 while (!queued_received_data_.empty()) {
353 DataBuffer* buffer = queued_received_data_.front();
354 queued_received_data_.pop();
355 delete buffer;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000356 }
357}
358
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000359void DataChannel::SendQueuedSendData() {
wu@webrtc.org91053e72013-08-10 07:18:04 +0000360 DeliverQueuedControlData();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000361 if (!was_ever_writable_) {
362 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000363 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000364
365 while (!queued_send_data_.empty()) {
366 DataBuffer* buffer = queued_send_data_.front();
367 cricket::SendDataResult send_result;
368 if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
369 LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
370 << send_result;
371 break;
372 }
373 queued_send_data_.pop_front();
374 delete buffer;
375 }
376}
377
wu@webrtc.org91053e72013-08-10 07:18:04 +0000378void DataChannel::DeliverQueuedControlData() {
379 if (was_ever_writable_) {
380 while (!queued_control_data_.empty()) {
381 const talk_base::Buffer *buf = queued_control_data_.front();
382 queued_control_data_.pop();
383 SendControl(buf);
384 }
385 }
386}
387
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000388void DataChannel::ClearQueuedSendData() {
wu@webrtc.orga0545692013-08-01 22:08:14 +0000389 while (!queued_send_data_.empty()) {
390 DataBuffer* buffer = queued_send_data_.front();
391 queued_send_data_.pop_front();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000392 delete buffer;
393 }
394}
395
396bool DataChannel::InternalSendWithoutQueueing(
397 const DataBuffer& buffer, cricket::SendDataResult* send_result) {
398 cricket::SendDataParams send_params;
399
400 send_params.ssrc = send_ssrc_;
401 if (session_->data_channel_type() == cricket::DCT_SCTP) {
402 send_params.ordered = config_.ordered;
403 send_params.max_rtx_count = config_.maxRetransmits;
404 send_params.max_rtx_ms = config_.maxRetransmitTime;
405 }
406 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
407
408 return session_->data_channel()->SendData(send_params, buffer.data,
409 send_result);
410}
411
412bool DataChannel::QueueSendData(const DataBuffer& buffer) {
413 if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
414 LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
415 return false;
416 }
417 queued_send_data_.push_back(new DataBuffer(buffer));
418 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000419}
420
421} // namespace webrtc