blob: 9409fd7a2101f9843e0f3d9aca2d2bda38f31afd [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/webrtcsession.h"
32#include "talk/base/logging.h"
33#include "talk/base/refcount.h"
34
35namespace webrtc {
36
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000037static size_t kMaxQueuedReceivedDataPackets = 100;
38static size_t kMaxQueuedSendDataPackets = 100;
henrike@webrtc.org28e20752013-07-10 00:45:36 +000039
40talk_base::scoped_refptr<DataChannel> DataChannel::Create(
41 WebRtcSession* session,
42 const std::string& label,
43 const DataChannelInit* config) {
44 talk_base::scoped_refptr<DataChannel> channel(
45 new talk_base::RefCountedObject<DataChannel>(session, label));
46 if (!channel->Init(config)) {
47 return NULL;
48 }
49 return channel;
50}
51
52DataChannel::DataChannel(WebRtcSession* session, const std::string& label)
53 : label_(label),
54 observer_(NULL),
55 state_(kConnecting),
56 was_ever_writable_(false),
57 session_(session),
58 data_session_(NULL),
59 send_ssrc_set_(false),
60 send_ssrc_(0),
61 receive_ssrc_set_(false),
62 receive_ssrc_(0) {
63}
64
65bool DataChannel::Init(const DataChannelInit* config) {
66 if (config) {
67 if (session_->data_channel_type() == cricket::DCT_RTP &&
68 (config->reliable ||
69 config->id != -1 ||
70 config->maxRetransmits != -1 ||
71 config->maxRetransmitTime != -1)) {
72 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
73 << "invalid DataChannelInit.";
74 return false;
75 } else if (session_->data_channel_type() == cricket::DCT_SCTP) {
76 if (config->id < -1 ||
77 config->maxRetransmits < -1 ||
78 config->maxRetransmitTime < -1) {
79 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
80 << "invalid DataChannelInit.";
81 return false;
82 }
83 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
84 LOG(LS_ERROR) <<
85 "maxRetransmits and maxRetransmitTime should not be both set.";
86 return false;
87 }
88 }
89 config_ = *config;
90 }
91 return true;
92}
93
94bool DataChannel::HasNegotiationCompleted() {
95 return send_ssrc_set_ == receive_ssrc_set_;
96}
97
98DataChannel::~DataChannel() {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000099 ClearQueuedReceivedData();
100 ClearQueuedSendData();
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000101 ClearQueuedControlData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000102}
103
104void DataChannel::RegisterObserver(DataChannelObserver* observer) {
105 observer_ = observer;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000106 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000107}
108
109void DataChannel::UnregisterObserver() {
110 observer_ = NULL;
111}
112
113bool DataChannel::reliable() const {
114 if (session_->data_channel_type() == cricket::DCT_RTP) {
115 return false;
116 } else {
117 return config_.maxRetransmits == -1 &&
118 config_.maxRetransmitTime == -1;
119 }
120}
121
122uint64 DataChannel::buffered_amount() const {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000123 uint64 buffered_amount = 0;
124 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
125 it != queued_send_data_.end();
126 ++it) {
127 buffered_amount += (*it)->size();
128 }
129 return buffered_amount;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000130}
131
132void DataChannel::Close() {
133 if (state_ == kClosed)
134 return;
135 send_ssrc_ = 0;
136 send_ssrc_set_ = false;
137 SetState(kClosing);
138 UpdateState();
139}
140
141bool DataChannel::Send(const DataBuffer& buffer) {
142 if (state_ != kOpen) {
143 return false;
144 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000145 // If the queue is non-empty, we're waiting for SignalReadyToSend,
146 // so just add to the end of the queue and keep waiting.
147 if (!queued_send_data_.empty()) {
148 return QueueSendData(buffer);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000149 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000150
151 cricket::SendDataResult send_result;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000152 if (!InternalSendWithoutQueueing(buffer, &send_result)) {
153 if (send_result == cricket::SDR_BLOCK) {
154 return QueueSendData(buffer);
155 }
156 // Fail for other results.
157 // TODO(jiayl): We should close the data channel in this case.
158 return false;
159 }
160 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000161}
162
wu@webrtc.org91053e72013-08-10 07:18:04 +0000163void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
164 queued_control_data_.push(buffer);
165}
166
167bool DataChannel::SendControl(const talk_base::Buffer* buffer) {
168 if (state_ != kOpen) {
169 QueueControl(buffer);
170 return true;
171 }
172 if (session_->data_channel_type() == cricket::DCT_RTP) {
173 delete buffer;
174 return false;
175 }
176
177 cricket::SendDataParams send_params;
178 send_params.ssrc = config_.id;
179 send_params.ordered = true;
180 send_params.type = cricket::DMT_CONTROL;
181
182 cricket::SendDataResult send_result;
183 bool retval = session_->data_channel()->SendData(
184 send_params, *buffer, &send_result);
185 if (!retval && send_result == cricket::SDR_BLOCK) {
186 // Link is congested. Queue for later.
187 QueueControl(buffer);
188 } else {
189 delete buffer;
190 }
191 return retval;
192}
193
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000194void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
195 if (receive_ssrc_set_) {
196 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
sergeyu@chromium.orga59696b2013-09-13 23:48:58 +0000197 !send_ssrc_set_ ||
198 receive_ssrc_ == send_ssrc_);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000199 return;
200 }
201 receive_ssrc_ = receive_ssrc;
202 receive_ssrc_set_ = true;
203 UpdateState();
204}
205
206// The remote peer request that this channel shall be closed.
207void DataChannel::RemotePeerRequestClose() {
208 DoClose();
209}
210
211void DataChannel::SetSendSsrc(uint32 send_ssrc) {
212 if (send_ssrc_set_) {
213 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
sergeyu@chromium.orga59696b2013-09-13 23:48:58 +0000214 !receive_ssrc_set_ ||
215 receive_ssrc_ == send_ssrc_);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000216 return;
217 }
218 send_ssrc_ = send_ssrc;
219 send_ssrc_set_ = true;
220 UpdateState();
221}
222
223// The underlaying data engine is closing.
224// This function make sure the DataChannel is disconneced and change state to
225// kClosed.
226void DataChannel::OnDataEngineClose() {
227 DoClose();
228}
229
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000230void DataChannel::OnDataReceived(cricket::DataChannel* channel,
231 const cricket::ReceiveDataParams& params,
232 const talk_base::Buffer& payload) {
233 if (params.ssrc != receive_ssrc_) {
234 return;
235 }
236
237 bool binary = (params.type == cricket::DMT_BINARY);
238 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
239 if (was_ever_writable_ && observer_) {
240 observer_->OnMessage(*buffer.get());
241 } else {
242 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
243 // TODO(jiayl): We should close the data channel in this case.
244 LOG(LS_ERROR)
245 << "Queued received data exceeds the max number of packes.";
246 ClearQueuedReceivedData();
247 }
248 queued_received_data_.push(buffer.release());
249 }
250}
251
252void DataChannel::OnChannelReady(bool writable) {
253 if (!writable) {
254 return;
255 }
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000256 // Update the readyState and send the queued control message if the channel
257 // is writable for the first time; otherwise it means the channel was blocked
258 // for sending and now unblocked, so send the queued data now.
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000259 if (!was_ever_writable_) {
260 was_ever_writable_ = true;
261 UpdateState();
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000262 DeliverQueuedControlData();
263 ASSERT(queued_send_data_.empty());
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000264 } else if (state_ == kOpen) {
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000265 DeliverQueuedSendData();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000266 }
267}
268
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000269void DataChannel::DoClose() {
270 receive_ssrc_set_ = false;
271 send_ssrc_set_ = false;
272 SetState(kClosing);
273 UpdateState();
274}
275
276void DataChannel::UpdateState() {
277 switch (state_) {
278 case kConnecting: {
279 if (HasNegotiationCompleted()) {
280 if (!IsConnectedToDataSession()) {
281 ConnectToDataSession();
282 }
283 if (was_ever_writable_) {
284 SetState(kOpen);
285 // If we have received buffers before the channel got writable.
286 // Deliver them now.
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000287 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000288 }
289 }
290 break;
291 }
292 case kOpen: {
293 break;
294 }
295 case kClosing: {
296 if (IsConnectedToDataSession()) {
297 DisconnectFromDataSession();
298 }
299 if (HasNegotiationCompleted()) {
300 SetState(kClosed);
301 }
302 break;
303 }
304 case kClosed:
305 break;
306 }
307}
308
309void DataChannel::SetState(DataState state) {
310 state_ = state;
311 if (observer_) {
312 observer_->OnStateChange();
313 }
314}
315
316void DataChannel::ConnectToDataSession() {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000317 if (!session_->data_channel()) {
318 LOG(LS_ERROR) << "The DataEngine does not exist.";
wu@webrtc.org91053e72013-08-10 07:18:04 +0000319 ASSERT(session_->data_channel() != NULL);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000320 return;
321 }
322
323 data_session_ = session_->data_channel();
324 data_session_->SignalReadyToSendData.connect(this,
325 &DataChannel::OnChannelReady);
326 data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived);
wu@webrtc.org91053e72013-08-10 07:18:04 +0000327 cricket::StreamParams params =
328 cricket::StreamParams::CreateLegacy(id());
wu@webrtc.orgcadf9042013-08-30 21:24:16 +0000329 data_session_->AddRecvStream(params);
330 data_session_->AddSendStream(params);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000331}
332
333void DataChannel::DisconnectFromDataSession() {
wu@webrtc.orgcadf9042013-08-30 21:24:16 +0000334 data_session_->RemoveSendStream(id());
335 data_session_->RemoveRecvStream(id());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000336 data_session_->SignalReadyToSendData.disconnect(this);
337 data_session_->SignalDataReceived.disconnect(this);
338 data_session_ = NULL;
339}
340
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000341void DataChannel::DeliverQueuedReceivedData() {
342 if (!was_ever_writable_ || !observer_) {
343 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000344 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000345
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000346 while (!queued_received_data_.empty()) {
347 DataBuffer* buffer = queued_received_data_.front();
348 observer_->OnMessage(*buffer);
349 queued_received_data_.pop();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000350 delete buffer;
351 }
352}
353
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000354void DataChannel::ClearQueuedReceivedData() {
355 while (!queued_received_data_.empty()) {
356 DataBuffer* buffer = queued_received_data_.front();
357 queued_received_data_.pop();
358 delete buffer;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000359 }
360}
361
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000362void DataChannel::DeliverQueuedSendData() {
wu@webrtc.org91053e72013-08-10 07:18:04 +0000363 DeliverQueuedControlData();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000364 if (!was_ever_writable_) {
365 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000366 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000367
368 while (!queued_send_data_.empty()) {
369 DataBuffer* buffer = queued_send_data_.front();
370 cricket::SendDataResult send_result;
371 if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000372 LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000373 << send_result;
374 break;
375 }
376 queued_send_data_.pop_front();
377 delete buffer;
378 }
379}
380
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000381void DataChannel::ClearQueuedControlData() {
382 while (!queued_control_data_.empty()) {
383 const talk_base::Buffer *buf = queued_control_data_.front();
384 queued_control_data_.pop();
385 delete buf;
386 }
387}
388
wu@webrtc.org91053e72013-08-10 07:18:04 +0000389void DataChannel::DeliverQueuedControlData() {
390 if (was_ever_writable_) {
391 while (!queued_control_data_.empty()) {
392 const talk_base::Buffer *buf = queued_control_data_.front();
393 queued_control_data_.pop();
394 SendControl(buf);
395 }
396 }
397}
398
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000399void DataChannel::ClearQueuedSendData() {
wu@webrtc.orga0545692013-08-01 22:08:14 +0000400 while (!queued_send_data_.empty()) {
401 DataBuffer* buffer = queued_send_data_.front();
402 queued_send_data_.pop_front();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000403 delete buffer;
404 }
405}
406
407bool DataChannel::InternalSendWithoutQueueing(
408 const DataBuffer& buffer, cricket::SendDataResult* send_result) {
409 cricket::SendDataParams send_params;
410
411 send_params.ssrc = send_ssrc_;
412 if (session_->data_channel_type() == cricket::DCT_SCTP) {
413 send_params.ordered = config_.ordered;
414 send_params.max_rtx_count = config_.maxRetransmits;
415 send_params.max_rtx_ms = config_.maxRetransmitTime;
416 }
417 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
418
419 return session_->data_channel()->SendData(send_params, buffer.data,
420 send_result);
421}
422
423bool DataChannel::QueueSendData(const DataBuffer& buffer) {
424 if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
425 LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
426 return false;
427 }
428 queued_send_data_.push_back(new DataBuffer(buffer));
429 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000430}
431
432} // namespace webrtc