blob: 484886b79c097f3ec13dc662072cad686a839aeb [file] [log] [blame]
Harald Alvestrand00cf34c2019-12-02 09:56:02 +01001/*
2 * Copyright 2019 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11// This file contains the implementation of the class
12// webrtc::PeerConnection::DataChannelController.
13//
14// The intent is that this should be webrtc::DataChannelController, but
15// as a migration stage, it is simpler to have it as an inner class,
16// declared in the header file pc/peer_connection.h
17
18#include "pc/peer_connection.h"
19#include "pc/sctp_utils.h"
20
21namespace webrtc {
22
23bool PeerConnection::DataChannelController::SendData(
24 const cricket::SendDataParams& params,
25 const rtc::CopyOnWriteBuffer& payload,
26 cricket::SendDataResult* result) {
27 // RTC_DCHECK_RUN_ON(signaling_thread());
28 if (data_channel_transport()) {
29 SendDataParams send_params;
30 send_params.type = ToWebrtcDataMessageType(params.type);
31 send_params.ordered = params.ordered;
32 if (params.max_rtx_count >= 0) {
33 send_params.max_rtx_count = params.max_rtx_count;
34 } else if (params.max_rtx_ms >= 0) {
35 send_params.max_rtx_ms = params.max_rtx_ms;
36 }
37
38 RTCError error = network_thread()->Invoke<RTCError>(
39 RTC_FROM_HERE, [this, params, send_params, payload] {
40 return data_channel_transport()->SendData(params.sid, send_params,
41 payload);
42 });
43
44 if (error.ok()) {
45 *result = cricket::SendDataResult::SDR_SUCCESS;
46 return true;
47 } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
48 // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
49 // TODO(mellem): Stop using RTCError here and get rid of the mapping.
50 *result = cricket::SendDataResult::SDR_BLOCK;
51 return false;
52 }
53 *result = cricket::SendDataResult::SDR_ERROR;
54 return false;
55 } else if (rtp_data_channel()) {
56 return rtp_data_channel()->SendData(params, payload, result);
57 }
58 RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
59 return false;
60}
61
62bool PeerConnection::DataChannelController::ConnectDataChannel(
63 DataChannel* webrtc_data_channel) {
64 RTC_DCHECK_RUN_ON(signaling_thread());
65 if (!rtp_data_channel() && !data_channel_transport()) {
66 // Don't log an error here, because DataChannels are expected to call
67 // ConnectDataChannel in this state. It's the only way to initially tell
68 // whether or not the underlying transport is ready.
69 return false;
70 }
71 if (data_channel_transport()) {
72 SignalDataChannelTransportWritable_s.connect(webrtc_data_channel,
73 &DataChannel::OnChannelReady);
74 SignalDataChannelTransportReceivedData_s.connect(
75 webrtc_data_channel, &DataChannel::OnDataReceived);
76 SignalDataChannelTransportChannelClosing_s.connect(
77 webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely);
78 SignalDataChannelTransportChannelClosed_s.connect(
79 webrtc_data_channel, &DataChannel::OnClosingProcedureComplete);
80 }
81 if (rtp_data_channel()) {
82 rtp_data_channel()->SignalReadyToSendData.connect(
83 webrtc_data_channel, &DataChannel::OnChannelReady);
84 rtp_data_channel()->SignalDataReceived.connect(
85 webrtc_data_channel, &DataChannel::OnDataReceived);
86 }
87 return true;
88}
89
90void PeerConnection::DataChannelController::DisconnectDataChannel(
91 DataChannel* webrtc_data_channel) {
92 RTC_DCHECK_RUN_ON(signaling_thread());
93 if (!rtp_data_channel() && !data_channel_transport()) {
94 RTC_LOG(LS_ERROR)
95 << "DisconnectDataChannel called when rtp_data_channel_ and "
96 "sctp_transport_ are NULL.";
97 return;
98 }
99 if (data_channel_transport()) {
100 SignalDataChannelTransportWritable_s.disconnect(webrtc_data_channel);
101 SignalDataChannelTransportReceivedData_s.disconnect(webrtc_data_channel);
102 SignalDataChannelTransportChannelClosing_s.disconnect(webrtc_data_channel);
103 SignalDataChannelTransportChannelClosed_s.disconnect(webrtc_data_channel);
104 }
105 if (rtp_data_channel()) {
106 rtp_data_channel()->SignalReadyToSendData.disconnect(webrtc_data_channel);
107 rtp_data_channel()->SignalDataReceived.disconnect(webrtc_data_channel);
108 }
109}
110
111void PeerConnection::DataChannelController::AddSctpDataStream(int sid) {
112 if (data_channel_transport()) {
113 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
114 if (data_channel_transport()) {
115 data_channel_transport()->OpenChannel(sid);
116 }
117 });
118 }
119}
120
121void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) {
122 if (data_channel_transport()) {
123 network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
124 if (data_channel_transport()) {
125 data_channel_transport()->CloseChannel(sid);
126 }
127 });
128 }
129}
130
131bool PeerConnection::DataChannelController::ReadyToSendData() const {
132 RTC_DCHECK_RUN_ON(signaling_thread());
133 return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
134 (data_channel_transport() && data_channel_transport_ready_to_send_);
135}
136
137void PeerConnection::DataChannelController::OnDataReceived(
138 int channel_id,
139 DataMessageType type,
140 const rtc::CopyOnWriteBuffer& buffer) {
141 RTC_DCHECK_RUN_ON(network_thread());
142 cricket::ReceiveDataParams params;
143 params.sid = channel_id;
144 params.type = ToCricketDataMessageType(type);
145 data_channel_transport_invoker_->AsyncInvoke<void>(
146 RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
147 RTC_DCHECK_RUN_ON(signaling_thread());
148 if (!HandleOpenMessage_s(params, buffer)) {
149 SignalDataChannelTransportReceivedData_s(params, buffer);
150 }
151 });
152}
153
154void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) {
155 RTC_DCHECK_RUN_ON(network_thread());
156 data_channel_transport_invoker_->AsyncInvoke<void>(
157 RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
158 RTC_DCHECK_RUN_ON(signaling_thread());
159 SignalDataChannelTransportChannelClosing_s(channel_id);
160 });
161}
162
163void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) {
164 RTC_DCHECK_RUN_ON(network_thread());
165 data_channel_transport_invoker_->AsyncInvoke<void>(
166 RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
167 RTC_DCHECK_RUN_ON(signaling_thread());
168 SignalDataChannelTransportChannelClosed_s(channel_id);
169 });
170}
171
172void PeerConnection::DataChannelController::OnReadyToSend() {
173 RTC_DCHECK_RUN_ON(network_thread());
174 data_channel_transport_invoker_->AsyncInvoke<void>(
175 RTC_FROM_HERE, signaling_thread(), [this] {
176 RTC_DCHECK_RUN_ON(signaling_thread());
177 data_channel_transport_ready_to_send_ = true;
178 SignalDataChannelTransportWritable_s(
179 data_channel_transport_ready_to_send_);
180 });
181}
182
183void PeerConnection::DataChannelController::SetupDataChannelTransport_n() {
184 RTC_DCHECK_RUN_ON(network_thread());
185 data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
186}
187
188void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() {
189 RTC_DCHECK_RUN_ON(network_thread());
190 data_channel_transport_invoker_ = nullptr;
191 if (data_channel_transport()) {
192 data_channel_transport()->SetDataSink(nullptr);
193 }
194 set_data_channel_transport(nullptr);
195}
196
197void PeerConnection::DataChannelController::OnTransportChanged(
198 DataChannelTransportInterface* new_data_channel_transport) {
199 RTC_DCHECK_RUN_ON(network_thread());
200 if (data_channel_transport() &&
201 data_channel_transport() != new_data_channel_transport) {
202 // Changed which data channel transport is used for |sctp_mid_| (eg. now
203 // it's bundled).
204 data_channel_transport()->SetDataSink(nullptr);
205 set_data_channel_transport(new_data_channel_transport);
206 if (new_data_channel_transport) {
207 new_data_channel_transport->SetDataSink(this);
208
209 // There's a new data channel transport. This needs to be signaled to the
210 // |sctp_data_channels_| so that they can reopen and reconnect. This is
211 // necessary when bundling is applied.
212 data_channel_transport_invoker_->AsyncInvoke<void>(
213 RTC_FROM_HERE, signaling_thread(), [this] {
214 RTC_DCHECK_RUN_ON(pc_->signaling_thread());
215 for (auto channel : pc_->sctp_data_channels_) {
216 channel->OnTransportChannelCreated();
217 }
218 });
219 }
220 }
221}
222
223bool PeerConnection::DataChannelController::HandleOpenMessage_s(
224 const cricket::ReceiveDataParams& params,
225 const rtc::CopyOnWriteBuffer& buffer) {
226 if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
227 // Received OPEN message; parse and signal that a new data channel should
228 // be created.
229 std::string label;
230 InternalDataChannelInit config;
231 config.id = params.ssrc;
232 if (!ParseDataChannelOpenMessage(buffer, &label, &config)) {
233 RTC_LOG(LS_WARNING) << "Failed to parse the OPEN message for ssrc "
234 << params.ssrc;
235 return true;
236 }
237 config.open_handshake_role = InternalDataChannelInit::kAcker;
238 OnDataChannelOpenMessage(label, config);
239 return true;
240 }
241 return false;
242}
243
244void PeerConnection::DataChannelController::OnDataChannelOpenMessage(
245 const std::string& label,
246 const InternalDataChannelInit& config) {
247 rtc::scoped_refptr<DataChannel> channel(
248 InternalCreateDataChannel(label, &config));
249 if (!channel.get()) {
250 RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
251 return;
252 }
253
254 rtc::scoped_refptr<DataChannelInterface> proxy_channel =
255 DataChannelProxy::Create(signaling_thread(), channel);
256 {
257 RTC_DCHECK_RUN_ON(pc_->signaling_thread());
258 pc_->Observer()->OnDataChannel(std::move(proxy_channel));
259 pc_->NoteUsageEvent(UsageEvent::DATA_ADDED);
260 }
261}
262
263rtc::scoped_refptr<DataChannel>
264PeerConnection::DataChannelController::InternalCreateDataChannel(
265 const std::string& label,
266 const InternalDataChannelInit* config) {
267 RTC_DCHECK_RUN_ON(pc_->signaling_thread());
268 if (pc_->IsClosed()) {
269 return nullptr;
270 }
271 if (pc_->data_channel_type() == cricket::DCT_NONE) {
272 RTC_LOG(LS_ERROR)
273 << "InternalCreateDataChannel: Data is not supported in this call.";
274 return nullptr;
275 }
276 InternalDataChannelInit new_config =
277 config ? (*config) : InternalDataChannelInit();
278 if (DataChannel::IsSctpLike(pc_->data_channel_type_)) {
279 if (new_config.id < 0) {
280 rtc::SSLRole role;
281 if ((pc_->GetSctpSslRole(&role)) &&
282 !sid_allocator_.AllocateSid(role, &new_config.id)) {
283 RTC_LOG(LS_ERROR)
284 << "No id can be allocated for the SCTP data channel.";
285 return nullptr;
286 }
287 } else if (!sid_allocator_.ReserveSid(new_config.id)) {
288 RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
289 "because the id is already in use or out of range.";
290 return nullptr;
291 }
292 }
293
294 rtc::scoped_refptr<DataChannel> channel(
295 DataChannel::Create(this, pc_->data_channel_type(), label, new_config));
296 if (!channel) {
297 sid_allocator_.ReleaseSid(new_config.id);
298 return nullptr;
299 }
300
301 if (channel->data_channel_type() == cricket::DCT_RTP) {
302 if (pc_->rtp_data_channels_.find(channel->label()) !=
303 pc_->rtp_data_channels_.end()) {
304 RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
305 << " already exists.";
306 return nullptr;
307 }
308 pc_->rtp_data_channels_[channel->label()] = channel;
309 } else {
310 RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_));
311 pc_->sctp_data_channels_.push_back(channel);
312 channel->SignalClosed.connect(pc_,
313 &PeerConnection::OnSctpDataChannelClosed);
314 }
315
316 pc_->SignalDataChannelCreated_(channel.get());
317 return channel;
318}
319
320void PeerConnection::DataChannelController::AllocateSctpSids(
321 rtc::SSLRole role) {
322 RTC_DCHECK_RUN_ON(pc_->signaling_thread());
323 std::vector<rtc::scoped_refptr<DataChannel>> channels_to_close;
324 for (const auto& channel : pc_->sctp_data_channels_) {
325 if (channel->id() < 0) {
326 int sid;
327 if (!sid_allocator_.AllocateSid(role, &sid)) {
328 RTC_LOG(LS_ERROR) << "Failed to allocate SCTP sid, closing channel.";
329 channels_to_close.push_back(channel);
330 continue;
331 }
332 channel->SetSctpSid(sid);
333 }
334 }
335 // Since closing modifies the list of channels, we have to do the actual
336 // closing outside the loop.
337 for (const auto& channel : channels_to_close) {
338 channel->CloseAbruptly();
339 }
340}
341
342void PeerConnection::DataChannelController::OnSctpDataChannelClosed(
343 DataChannel* channel) {
344 RTC_DCHECK_RUN_ON(pc_->signaling_thread());
345 for (auto it = pc_->sctp_data_channels_.begin();
346 it != pc_->sctp_data_channels_.end(); ++it) {
347 if (it->get() == channel) {
348 if (channel->id() >= 0) {
349 // After the closing procedure is done, it's safe to use this ID for
350 // another data channel.
351 sid_allocator_.ReleaseSid(channel->id());
352 }
353 // Since this method is triggered by a signal from the DataChannel,
354 // we can't free it directly here; we need to free it asynchronously.
355 pc_->sctp_data_channels_to_free_.push_back(*it);
356 pc_->sctp_data_channels_.erase(it);
357 pc_->SignalFreeDataChannels();
358 return;
359 }
360 }
361}
362
363} // namespace webrtc