blob: 8b3f1ac6df6bf4f1a17df361bfe1cbb5a7285a99 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/webrtcsession.h"
32#include "talk/base/logging.h"
33#include "talk/base/refcount.h"
34
35namespace webrtc {
36
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000037static size_t kMaxQueuedReceivedDataPackets = 100;
38static size_t kMaxQueuedSendDataPackets = 100;
henrike@webrtc.org28e20752013-07-10 00:45:36 +000039
40talk_base::scoped_refptr<DataChannel> DataChannel::Create(
41 WebRtcSession* session,
42 const std::string& label,
43 const DataChannelInit* config) {
44 talk_base::scoped_refptr<DataChannel> channel(
45 new talk_base::RefCountedObject<DataChannel>(session, label));
46 if (!channel->Init(config)) {
47 return NULL;
48 }
49 return channel;
50}
51
52DataChannel::DataChannel(WebRtcSession* session, const std::string& label)
53 : label_(label),
54 observer_(NULL),
55 state_(kConnecting),
56 was_ever_writable_(false),
57 session_(session),
58 data_session_(NULL),
59 send_ssrc_set_(false),
60 send_ssrc_(0),
61 receive_ssrc_set_(false),
62 receive_ssrc_(0) {
63}
64
65bool DataChannel::Init(const DataChannelInit* config) {
66 if (config) {
67 if (session_->data_channel_type() == cricket::DCT_RTP &&
68 (config->reliable ||
69 config->id != -1 ||
70 config->maxRetransmits != -1 ||
71 config->maxRetransmitTime != -1)) {
72 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
73 << "invalid DataChannelInit.";
74 return false;
75 } else if (session_->data_channel_type() == cricket::DCT_SCTP) {
76 if (config->id < -1 ||
77 config->maxRetransmits < -1 ||
78 config->maxRetransmitTime < -1) {
79 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
80 << "invalid DataChannelInit.";
81 return false;
82 }
83 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
84 LOG(LS_ERROR) <<
85 "maxRetransmits and maxRetransmitTime should not be both set.";
86 return false;
87 }
88 }
89 config_ = *config;
90 }
91 return true;
92}
93
94bool DataChannel::HasNegotiationCompleted() {
95 return send_ssrc_set_ == receive_ssrc_set_;
96}
97
98DataChannel::~DataChannel() {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +000099 ClearQueuedReceivedData();
100 ClearQueuedSendData();
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000101 ClearQueuedControlData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000102}
103
104void DataChannel::RegisterObserver(DataChannelObserver* observer) {
105 observer_ = observer;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000106 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000107}
108
109void DataChannel::UnregisterObserver() {
110 observer_ = NULL;
111}
112
113bool DataChannel::reliable() const {
114 if (session_->data_channel_type() == cricket::DCT_RTP) {
115 return false;
116 } else {
117 return config_.maxRetransmits == -1 &&
118 config_.maxRetransmitTime == -1;
119 }
120}
121
122uint64 DataChannel::buffered_amount() const {
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000123 uint64 buffered_amount = 0;
124 for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
125 it != queued_send_data_.end();
126 ++it) {
127 buffered_amount += (*it)->size();
128 }
129 return buffered_amount;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000130}
131
132void DataChannel::Close() {
133 if (state_ == kClosed)
134 return;
135 send_ssrc_ = 0;
136 send_ssrc_set_ = false;
137 SetState(kClosing);
138 UpdateState();
139}
140
141bool DataChannel::Send(const DataBuffer& buffer) {
142 if (state_ != kOpen) {
143 return false;
144 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000145 // If the queue is non-empty, we're waiting for SignalReadyToSend,
146 // so just add to the end of the queue and keep waiting.
147 if (!queued_send_data_.empty()) {
148 return QueueSendData(buffer);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000149 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000150
151 cricket::SendDataResult send_result;
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000152 if (!InternalSendWithoutQueueing(buffer, &send_result)) {
153 if (send_result == cricket::SDR_BLOCK) {
154 return QueueSendData(buffer);
155 }
156 // Fail for other results.
157 // TODO(jiayl): We should close the data channel in this case.
158 return false;
159 }
160 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000161}
162
wu@webrtc.org91053e72013-08-10 07:18:04 +0000163void DataChannel::QueueControl(const talk_base::Buffer* buffer) {
164 queued_control_data_.push(buffer);
165}
166
167bool DataChannel::SendControl(const talk_base::Buffer* buffer) {
168 if (state_ != kOpen) {
169 QueueControl(buffer);
170 return true;
171 }
172 if (session_->data_channel_type() == cricket::DCT_RTP) {
173 delete buffer;
174 return false;
175 }
176
177 cricket::SendDataParams send_params;
178 send_params.ssrc = config_.id;
179 send_params.ordered = true;
180 send_params.type = cricket::DMT_CONTROL;
181
182 cricket::SendDataResult send_result;
183 bool retval = session_->data_channel()->SendData(
184 send_params, *buffer, &send_result);
185 if (!retval && send_result == cricket::SDR_BLOCK) {
186 // Link is congested. Queue for later.
187 QueueControl(buffer);
188 } else {
189 delete buffer;
190 }
191 return retval;
192}
193
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000194void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
195 if (receive_ssrc_set_) {
196 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
197 receive_ssrc_ == send_ssrc_);
198 return;
199 }
200 receive_ssrc_ = receive_ssrc;
201 receive_ssrc_set_ = true;
202 UpdateState();
203}
204
205// The remote peer request that this channel shall be closed.
206void DataChannel::RemotePeerRequestClose() {
207 DoClose();
208}
209
210void DataChannel::SetSendSsrc(uint32 send_ssrc) {
211 if (send_ssrc_set_) {
212 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
213 receive_ssrc_ == send_ssrc_);
214 return;
215 }
216 send_ssrc_ = send_ssrc;
217 send_ssrc_set_ = true;
218 UpdateState();
219}
220
221// The underlaying data engine is closing.
222// This function make sure the DataChannel is disconneced and change state to
223// kClosed.
224void DataChannel::OnDataEngineClose() {
225 DoClose();
226}
227
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000228void DataChannel::OnDataReceived(cricket::DataChannel* channel,
229 const cricket::ReceiveDataParams& params,
230 const talk_base::Buffer& payload) {
231 if (params.ssrc != receive_ssrc_) {
232 return;
233 }
234
235 bool binary = (params.type == cricket::DMT_BINARY);
236 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
237 if (was_ever_writable_ && observer_) {
238 observer_->OnMessage(*buffer.get());
239 } else {
240 if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
241 // TODO(jiayl): We should close the data channel in this case.
242 LOG(LS_ERROR)
243 << "Queued received data exceeds the max number of packes.";
244 ClearQueuedReceivedData();
245 }
246 queued_received_data_.push(buffer.release());
247 }
248}
249
250void DataChannel::OnChannelReady(bool writable) {
251 if (!writable) {
252 return;
253 }
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000254 // Update the readyState and send the queued control message if the channel
255 // is writable for the first time; otherwise it means the channel was blocked
256 // for sending and now unblocked, so send the queued data now.
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000257 if (!was_ever_writable_) {
258 was_ever_writable_ = true;
259 UpdateState();
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000260 DeliverQueuedControlData();
261 ASSERT(queued_send_data_.empty());
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000262 } else if (state_ == kOpen) {
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000263 DeliverQueuedSendData();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000264 }
265}
266
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000267void DataChannel::DoClose() {
268 receive_ssrc_set_ = false;
269 send_ssrc_set_ = false;
270 SetState(kClosing);
271 UpdateState();
272}
273
274void DataChannel::UpdateState() {
275 switch (state_) {
276 case kConnecting: {
277 if (HasNegotiationCompleted()) {
278 if (!IsConnectedToDataSession()) {
279 ConnectToDataSession();
280 }
281 if (was_ever_writable_) {
282 SetState(kOpen);
283 // If we have received buffers before the channel got writable.
284 // Deliver them now.
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000285 DeliverQueuedReceivedData();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000286 }
287 }
288 break;
289 }
290 case kOpen: {
291 break;
292 }
293 case kClosing: {
294 if (IsConnectedToDataSession()) {
295 DisconnectFromDataSession();
296 }
297 if (HasNegotiationCompleted()) {
298 SetState(kClosed);
299 }
300 break;
301 }
302 case kClosed:
303 break;
304 }
305}
306
307void DataChannel::SetState(DataState state) {
308 state_ = state;
309 if (observer_) {
310 observer_->OnStateChange();
311 }
312}
313
314void DataChannel::ConnectToDataSession() {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000315 if (!session_->data_channel()) {
316 LOG(LS_ERROR) << "The DataEngine does not exist.";
wu@webrtc.org91053e72013-08-10 07:18:04 +0000317 ASSERT(session_->data_channel() != NULL);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000318 return;
319 }
320
321 data_session_ = session_->data_channel();
322 data_session_->SignalReadyToSendData.connect(this,
323 &DataChannel::OnChannelReady);
324 data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived);
wu@webrtc.org91053e72013-08-10 07:18:04 +0000325 cricket::StreamParams params =
326 cricket::StreamParams::CreateLegacy(id());
327 data_session_->media_channel()->AddSendStream(params);
328 data_session_->media_channel()->AddRecvStream(params);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000329}
330
331void DataChannel::DisconnectFromDataSession() {
wu@webrtc.org91053e72013-08-10 07:18:04 +0000332 if (data_session_->media_channel() != NULL) {
333 data_session_->media_channel()->RemoveSendStream(id());
334 data_session_->media_channel()->RemoveRecvStream(id());
335 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000336 data_session_->SignalReadyToSendData.disconnect(this);
337 data_session_->SignalDataReceived.disconnect(this);
338 data_session_ = NULL;
339}
340
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000341void DataChannel::DeliverQueuedReceivedData() {
342 if (!was_ever_writable_ || !observer_) {
343 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000344 }
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000345
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000346 while (!queued_received_data_.empty()) {
347 DataBuffer* buffer = queued_received_data_.front();
348 observer_->OnMessage(*buffer);
349 queued_received_data_.pop();
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000350 delete buffer;
351 }
352}
353
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000354void DataChannel::ClearQueuedReceivedData() {
355 while (!queued_received_data_.empty()) {
356 DataBuffer* buffer = queued_received_data_.front();
357 queued_received_data_.pop();
358 delete buffer;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000359 }
360}
361
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000362void DataChannel::DeliverQueuedSendData() {
wu@webrtc.org91053e72013-08-10 07:18:04 +0000363 DeliverQueuedControlData();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000364 if (!was_ever_writable_) {
365 return;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000366 }
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000367
368 while (!queued_send_data_.empty()) {
369 DataBuffer* buffer = queued_send_data_.front();
370 cricket::SendDataResult send_result;
371 if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000372 LOG(LS_WARNING) << "DeliverQueuedSendData aborted due to send_result "
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000373 << send_result;
374 break;
375 }
376 queued_send_data_.pop_front();
377 delete buffer;
378 }
379}
380
wu@webrtc.org822fbd82013-08-15 23:38:54 +0000381void DataChannel::ClearQueuedControlData() {
382 while (!queued_control_data_.empty()) {
383 const talk_base::Buffer *buf = queued_control_data_.front();
384 queued_control_data_.pop();
385 delete buf;
386 }
387}
388
wu@webrtc.org91053e72013-08-10 07:18:04 +0000389void DataChannel::DeliverQueuedControlData() {
390 if (was_ever_writable_) {
391 while (!queued_control_data_.empty()) {
392 const talk_base::Buffer *buf = queued_control_data_.front();
393 queued_control_data_.pop();
394 SendControl(buf);
395 }
396 }
397}
398
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000399void DataChannel::ClearQueuedSendData() {
wu@webrtc.orga0545692013-08-01 22:08:14 +0000400 while (!queued_send_data_.empty()) {
401 DataBuffer* buffer = queued_send_data_.front();
402 queued_send_data_.pop_front();
wu@webrtc.orgd64719d2013-08-01 00:00:07 +0000403 delete buffer;
404 }
405}
406
407bool DataChannel::InternalSendWithoutQueueing(
408 const DataBuffer& buffer, cricket::SendDataResult* send_result) {
409 cricket::SendDataParams send_params;
410
411 send_params.ssrc = send_ssrc_;
412 if (session_->data_channel_type() == cricket::DCT_SCTP) {
413 send_params.ordered = config_.ordered;
414 send_params.max_rtx_count = config_.maxRetransmits;
415 send_params.max_rtx_ms = config_.maxRetransmitTime;
416 }
417 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
418
419 return session_->data_channel()->SendData(send_params, buffer.data,
420 send_result);
421}
422
423bool DataChannel::QueueSendData(const DataBuffer& buffer) {
424 if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
425 LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
426 return false;
427 }
428 queued_send_data_.push_back(new DataBuffer(buffer));
429 return true;
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000430}
431
432} // namespace webrtc