blob: 345cd5f1006d84848fb0de4d2037a0e3bc1515bf [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2012, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27#include "talk/app/webrtc/datachannel.h"
28
29#include <string>
30
31#include "talk/app/webrtc/webrtcsession.h"
32#include "talk/base/logging.h"
33#include "talk/base/refcount.h"
34
35namespace webrtc {
36
37static size_t kMaxQueuedDataPackets = 100;
38
39talk_base::scoped_refptr<DataChannel> DataChannel::Create(
40 WebRtcSession* session,
41 const std::string& label,
42 const DataChannelInit* config) {
43 talk_base::scoped_refptr<DataChannel> channel(
44 new talk_base::RefCountedObject<DataChannel>(session, label));
45 if (!channel->Init(config)) {
46 return NULL;
47 }
48 return channel;
49}
50
51DataChannel::DataChannel(WebRtcSession* session, const std::string& label)
52 : label_(label),
53 observer_(NULL),
54 state_(kConnecting),
55 was_ever_writable_(false),
56 session_(session),
57 data_session_(NULL),
58 send_ssrc_set_(false),
59 send_ssrc_(0),
60 receive_ssrc_set_(false),
61 receive_ssrc_(0) {
62}
63
64bool DataChannel::Init(const DataChannelInit* config) {
65 if (config) {
66 if (session_->data_channel_type() == cricket::DCT_RTP &&
67 (config->reliable ||
68 config->id != -1 ||
69 config->maxRetransmits != -1 ||
70 config->maxRetransmitTime != -1)) {
71 LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
72 << "invalid DataChannelInit.";
73 return false;
74 } else if (session_->data_channel_type() == cricket::DCT_SCTP) {
75 if (config->id < -1 ||
76 config->maxRetransmits < -1 ||
77 config->maxRetransmitTime < -1) {
78 LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
79 << "invalid DataChannelInit.";
80 return false;
81 }
82 if (config->maxRetransmits != -1 && config->maxRetransmitTime != -1) {
83 LOG(LS_ERROR) <<
84 "maxRetransmits and maxRetransmitTime should not be both set.";
85 return false;
86 }
87 }
88 config_ = *config;
89 }
90 return true;
91}
92
93bool DataChannel::HasNegotiationCompleted() {
94 return send_ssrc_set_ == receive_ssrc_set_;
95}
96
97DataChannel::~DataChannel() {
98 ClearQueuedData();
99}
100
101void DataChannel::RegisterObserver(DataChannelObserver* observer) {
102 observer_ = observer;
103 DeliverQueuedData();
104}
105
106void DataChannel::UnregisterObserver() {
107 observer_ = NULL;
108}
109
110bool DataChannel::reliable() const {
111 if (session_->data_channel_type() == cricket::DCT_RTP) {
112 return false;
113 } else {
114 return config_.maxRetransmits == -1 &&
115 config_.maxRetransmitTime == -1;
116 }
117}
118
119uint64 DataChannel::buffered_amount() const {
120 return 0;
121}
122
123void DataChannel::Close() {
124 if (state_ == kClosed)
125 return;
126 send_ssrc_ = 0;
127 send_ssrc_set_ = false;
128 SetState(kClosing);
129 UpdateState();
130}
131
132bool DataChannel::Send(const DataBuffer& buffer) {
133 if (state_ != kOpen) {
134 return false;
135 }
136 cricket::SendDataParams send_params;
137
138 send_params.ssrc = send_ssrc_;
139 if (session_->data_channel_type() == cricket::DCT_SCTP) {
140 send_params.ordered = config_.ordered;
141 send_params.max_rtx_count = config_.maxRetransmits;
142 send_params.max_rtx_ms = config_.maxRetransmitTime;
143 }
144 send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
145
146 cricket::SendDataResult send_result;
147 // TODO(pthatcher): Use send_result.would_block for buffering.
148 return session_->data_channel()->SendData(
149 send_params, buffer.data, &send_result);
150}
151
152void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
153 if (receive_ssrc_set_) {
154 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
155 receive_ssrc_ == send_ssrc_);
156 return;
157 }
158 receive_ssrc_ = receive_ssrc;
159 receive_ssrc_set_ = true;
160 UpdateState();
161}
162
163// The remote peer request that this channel shall be closed.
164void DataChannel::RemotePeerRequestClose() {
165 DoClose();
166}
167
168void DataChannel::SetSendSsrc(uint32 send_ssrc) {
169 if (send_ssrc_set_) {
170 ASSERT(session_->data_channel_type() == cricket::DCT_RTP ||
171 receive_ssrc_ == send_ssrc_);
172 return;
173 }
174 send_ssrc_ = send_ssrc;
175 send_ssrc_set_ = true;
176 UpdateState();
177}
178
179// The underlaying data engine is closing.
180// This function make sure the DataChannel is disconneced and change state to
181// kClosed.
182void DataChannel::OnDataEngineClose() {
183 DoClose();
184}
185
186void DataChannel::DoClose() {
187 receive_ssrc_set_ = false;
188 send_ssrc_set_ = false;
189 SetState(kClosing);
190 UpdateState();
191}
192
193void DataChannel::UpdateState() {
194 switch (state_) {
195 case kConnecting: {
196 if (HasNegotiationCompleted()) {
197 if (!IsConnectedToDataSession()) {
198 ConnectToDataSession();
199 }
200 if (was_ever_writable_) {
201 SetState(kOpen);
202 // If we have received buffers before the channel got writable.
203 // Deliver them now.
204 DeliverQueuedData();
205 }
206 }
207 break;
208 }
209 case kOpen: {
210 break;
211 }
212 case kClosing: {
213 if (IsConnectedToDataSession()) {
214 DisconnectFromDataSession();
215 }
216 if (HasNegotiationCompleted()) {
217 SetState(kClosed);
218 }
219 break;
220 }
221 case kClosed:
222 break;
223 }
224}
225
226void DataChannel::SetState(DataState state) {
227 state_ = state;
228 if (observer_) {
229 observer_->OnStateChange();
230 }
231}
232
233void DataChannel::ConnectToDataSession() {
234 ASSERT(session_->data_channel() != NULL);
235 if (!session_->data_channel()) {
236 LOG(LS_ERROR) << "The DataEngine does not exist.";
237 return;
238 }
239
240 data_session_ = session_->data_channel();
241 data_session_->SignalReadyToSendData.connect(this,
242 &DataChannel::OnChannelReady);
243 data_session_->SignalDataReceived.connect(this, &DataChannel::OnDataReceived);
244}
245
246void DataChannel::DisconnectFromDataSession() {
247 data_session_->SignalReadyToSendData.disconnect(this);
248 data_session_->SignalDataReceived.disconnect(this);
249 data_session_ = NULL;
250}
251
252void DataChannel::DeliverQueuedData() {
253 if (was_ever_writable_ && observer_) {
254 while (!queued_data_.empty()) {
255 DataBuffer* buffer = queued_data_.front();
256 observer_->OnMessage(*buffer);
257 queued_data_.pop();
258 delete buffer;
259 }
260 }
261}
262
263void DataChannel::ClearQueuedData() {
264 while (!queued_data_.empty()) {
265 DataBuffer* buffer = queued_data_.front();
266 queued_data_.pop();
267 delete buffer;
268 }
269}
270
271void DataChannel::OnDataReceived(cricket::DataChannel* channel,
272 const cricket::ReceiveDataParams& params,
273 const talk_base::Buffer& payload) {
274 if (params.ssrc == receive_ssrc_) {
275 bool binary = false;
276 talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
277 if (was_ever_writable_ && observer_) {
278 observer_->OnMessage(*buffer.get());
279 } else {
280 if (queued_data_.size() > kMaxQueuedDataPackets) {
281 ClearQueuedData();
282 }
283 queued_data_.push(buffer.release());
284 }
285 }
286}
287
288void DataChannel::OnChannelReady(bool writable) {
289 if (!was_ever_writable_ && writable) {
290 was_ever_writable_ = true;
291 UpdateState();
292 }
293}
294
295} // namespace webrtc