blob: 356493658ac93e89521bff7b5969b804f0f072d0 [file] [log] [blame]
Taylor Brandstetter3a034e12020-07-09 15:32:34 -07001/*
2 * Copyright 2020 The WebRTC project authors. All Rights Reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "pc/sctp_data_channel.h"
12
Florent Castelli5183f002021-05-07 13:52:44 +020013#include <limits>
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070014#include <memory>
15#include <string>
16#include <utility>
17
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070018#include "media/sctp/sctp_transport_internal.h"
Markus Handella1b82012021-05-26 18:56:30 +020019#include "pc/proxy.h"
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070020#include "pc/sctp_utils.h"
21#include "rtc_base/checks.h"
22#include "rtc_base/location.h"
23#include "rtc_base/logging.h"
24#include "rtc_base/ref_counted_object.h"
Florent Castellidcb9ffc2021-06-29 14:58:23 +020025#include "rtc_base/system/unused.h"
Tomas Gunnarssonb774d382020-09-20 23:56:24 +020026#include "rtc_base/task_utils/to_queued_task.h"
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070027#include "rtc_base/thread.h"
28
29namespace webrtc {
30
31namespace {
32
33static size_t kMaxQueuedReceivedDataBytes = 16 * 1024 * 1024;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070034
35static std::atomic<int> g_unique_id{0};
36
37int GenerateUniqueId() {
38 return ++g_unique_id;
39}
40
41// Define proxy for DataChannelInterface.
Mirko Bonadei9d9b8de2021-02-26 09:51:26 +010042BEGIN_PRIMARY_PROXY_MAP(DataChannel)
43PROXY_PRIMARY_THREAD_DESTRUCTOR()
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070044PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
45PROXY_METHOD0(void, UnregisterObserver)
46BYPASS_PROXY_CONSTMETHOD0(std::string, label)
47BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
48BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
49BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmitTime)
50BYPASS_PROXY_CONSTMETHOD0(uint16_t, maxRetransmits)
51BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxRetransmitsOpt)
52BYPASS_PROXY_CONSTMETHOD0(absl::optional<int>, maxPacketLifeTime)
53BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
54BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
55// Can't bypass the proxy since the id may change.
56PROXY_CONSTMETHOD0(int, id)
57BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
58PROXY_CONSTMETHOD0(DataState, state)
59PROXY_CONSTMETHOD0(RTCError, error)
60PROXY_CONSTMETHOD0(uint32_t, messages_sent)
61PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
62PROXY_CONSTMETHOD0(uint32_t, messages_received)
63PROXY_CONSTMETHOD0(uint64_t, bytes_received)
64PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
65PROXY_METHOD0(void, Close)
66// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
67PROXY_METHOD1(bool, Send, const DataBuffer&)
Markus Handell3d46d0b2021-05-27 21:42:57 +020068END_PROXY_MAP(DataChannel)
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070069
70} // namespace
71
72InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
73 : DataChannelInit(base), open_handshake_role(kOpener) {
74 // If the channel is externally negotiated, do not send the OPEN message.
75 if (base.negotiated) {
76 open_handshake_role = kNone;
77 } else {
78 // Datachannel is externally negotiated. Ignore the id value.
79 // Specified in createDataChannel, WebRTC spec section 6.1 bullet 13.
80 id = -1;
81 }
Florent Castelli5183f002021-05-07 13:52:44 +020082 // Backwards compatibility: If maxRetransmits or maxRetransmitTime
83 // are negative, the feature is not enabled.
84 // Values are clamped to a 16bit range.
85 if (maxRetransmits) {
86 if (*maxRetransmits < 0) {
87 RTC_LOG(LS_ERROR)
88 << "Accepting maxRetransmits < 0 for backwards compatibility";
89 maxRetransmits = absl::nullopt;
90 } else if (*maxRetransmits > std::numeric_limits<uint16_t>::max()) {
91 maxRetransmits = std::numeric_limits<uint16_t>::max();
92 }
Taylor Brandstetter3a034e12020-07-09 15:32:34 -070093 }
Florent Castelli5183f002021-05-07 13:52:44 +020094
95 if (maxRetransmitTime) {
96 if (*maxRetransmitTime < 0) {
97 RTC_LOG(LS_ERROR)
98 << "Accepting maxRetransmitTime < 0 for backwards compatibility";
99 maxRetransmitTime = absl::nullopt;
100 } else if (*maxRetransmitTime > std::numeric_limits<uint16_t>::max()) {
101 maxRetransmitTime = std::numeric_limits<uint16_t>::max();
102 }
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700103 }
104}
105
106bool SctpSidAllocator::AllocateSid(rtc::SSLRole role, int* sid) {
107 int potential_sid = (role == rtc::SSL_CLIENT) ? 0 : 1;
108 while (!IsSidAvailable(potential_sid)) {
109 potential_sid += 2;
110 if (potential_sid > static_cast<int>(cricket::kMaxSctpSid)) {
111 return false;
112 }
113 }
114
115 *sid = potential_sid;
116 used_sids_.insert(potential_sid);
117 return true;
118}
119
120bool SctpSidAllocator::ReserveSid(int sid) {
121 if (!IsSidAvailable(sid)) {
122 return false;
123 }
124 used_sids_.insert(sid);
125 return true;
126}
127
128void SctpSidAllocator::ReleaseSid(int sid) {
129 auto it = used_sids_.find(sid);
130 if (it != used_sids_.end()) {
131 used_sids_.erase(it);
132 }
133}
134
135bool SctpSidAllocator::IsSidAvailable(int sid) const {
136 if (sid < static_cast<int>(cricket::kMinSctpSid) ||
137 sid > static_cast<int>(cricket::kMaxSctpSid)) {
138 return false;
139 }
140 return used_sids_.find(sid) == used_sids_.end();
141}
142
143rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
144 SctpDataChannelProviderInterface* provider,
145 const std::string& label,
146 const InternalDataChannelInit& config,
147 rtc::Thread* signaling_thread,
148 rtc::Thread* network_thread) {
Tommi87f70902021-04-27 14:43:08 +0200149 auto channel = rtc::make_ref_counted<SctpDataChannel>(
150 config, provider, label, signaling_thread, network_thread);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700151 if (!channel->Init()) {
152 return nullptr;
153 }
154 return channel;
155}
156
157// static
158rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
159 rtc::scoped_refptr<SctpDataChannel> channel) {
160 // TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy.
Tomas Gunnarsson0d5ce622022-03-18 15:57:15 +0100161 auto* signaling_thread = channel->signaling_thread_;
162 return DataChannelProxy::Create(signaling_thread, std::move(channel));
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700163}
164
165SctpDataChannel::SctpDataChannel(const InternalDataChannelInit& config,
166 SctpDataChannelProviderInterface* provider,
167 const std::string& label,
168 rtc::Thread* signaling_thread,
169 rtc::Thread* network_thread)
170 : signaling_thread_(signaling_thread),
171 network_thread_(network_thread),
172 internal_id_(GenerateUniqueId()),
173 label_(label),
174 config_(config),
175 observer_(nullptr),
176 provider_(provider) {
177 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castellidcb9ffc2021-06-29 14:58:23 +0200178 RTC_UNUSED(network_thread_);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700179}
180
181bool SctpDataChannel::Init() {
182 RTC_DCHECK_RUN_ON(signaling_thread_);
183 if (config_.id < -1 ||
184 (config_.maxRetransmits && *config_.maxRetransmits < 0) ||
185 (config_.maxRetransmitTime && *config_.maxRetransmitTime < 0)) {
186 RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
187 "invalid DataChannelInit.";
188 return false;
189 }
190 if (config_.maxRetransmits && config_.maxRetransmitTime) {
191 RTC_LOG(LS_ERROR)
192 << "maxRetransmits and maxRetransmitTime should not be both set.";
193 return false;
194 }
195
196 switch (config_.open_handshake_role) {
197 case webrtc::InternalDataChannelInit::kNone: // pre-negotiated
198 handshake_state_ = kHandshakeReady;
199 break;
200 case webrtc::InternalDataChannelInit::kOpener:
201 handshake_state_ = kHandshakeShouldSendOpen;
202 break;
203 case webrtc::InternalDataChannelInit::kAcker:
204 handshake_state_ = kHandshakeShouldSendAck;
205 break;
206 }
207
208 // Try to connect to the transport in case the transport channel already
209 // exists.
210 OnTransportChannelCreated();
211
212 // Checks if the transport is ready to send because the initial channel
213 // ready signal may have been sent before the DataChannel creation.
214 // This has to be done async because the upper layer objects (e.g.
215 // Chrome glue and WebKit) are not wired up properly until after this
216 // function returns.
217 if (provider_->ReadyToSendData()) {
Tomas Gunnarssonb774d382020-09-20 23:56:24 +0200218 AddRef();
219 rtc::Thread::Current()->PostTask(ToQueuedTask(
220 [this] {
221 RTC_DCHECK_RUN_ON(signaling_thread_);
222 if (state_ != kClosed)
223 OnTransportReady(true);
224 },
225 [this] { Release(); }));
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700226 }
227
228 return true;
229}
230
231SctpDataChannel::~SctpDataChannel() {
232 RTC_DCHECK_RUN_ON(signaling_thread_);
233}
234
235void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
236 RTC_DCHECK_RUN_ON(signaling_thread_);
237 observer_ = observer;
238 DeliverQueuedReceivedData();
239}
240
241void SctpDataChannel::UnregisterObserver() {
242 RTC_DCHECK_RUN_ON(signaling_thread_);
243 observer_ = nullptr;
244}
245
246bool SctpDataChannel::reliable() const {
247 // May be called on any thread.
248 return !config_.maxRetransmits && !config_.maxRetransmitTime;
249}
250
251uint64_t SctpDataChannel::buffered_amount() const {
252 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castelli65956852021-10-18 11:13:22 +0200253 return queued_send_data_.byte_count();
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700254}
255
256void SctpDataChannel::Close() {
257 RTC_DCHECK_RUN_ON(signaling_thread_);
258 if (state_ == kClosed)
259 return;
260 SetState(kClosing);
261 // Will send queued data before beginning the underlying closing procedure.
262 UpdateState();
263}
264
265SctpDataChannel::DataState SctpDataChannel::state() const {
266 RTC_DCHECK_RUN_ON(signaling_thread_);
267 return state_;
268}
269
270RTCError SctpDataChannel::error() const {
271 RTC_DCHECK_RUN_ON(signaling_thread_);
272 return error_;
273}
274
275uint32_t SctpDataChannel::messages_sent() const {
276 RTC_DCHECK_RUN_ON(signaling_thread_);
277 return messages_sent_;
278}
279
280uint64_t SctpDataChannel::bytes_sent() const {
281 RTC_DCHECK_RUN_ON(signaling_thread_);
282 return bytes_sent_;
283}
284
285uint32_t SctpDataChannel::messages_received() const {
286 RTC_DCHECK_RUN_ON(signaling_thread_);
287 return messages_received_;
288}
289
290uint64_t SctpDataChannel::bytes_received() const {
291 RTC_DCHECK_RUN_ON(signaling_thread_);
292 return bytes_received_;
293}
294
295bool SctpDataChannel::Send(const DataBuffer& buffer) {
296 RTC_DCHECK_RUN_ON(signaling_thread_);
297 // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
298 // thread. Bring buffer management etc to the network thread and keep the
299 // operational state management on the signaling thread.
300
301 if (state_ != kOpen) {
302 return false;
303 }
304
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700305 // If the queue is non-empty, we're waiting for SignalReadyToSend,
306 // so just add to the end of the queue and keep waiting.
307 if (!queued_send_data_.Empty()) {
308 if (!QueueSendDataMessage(buffer)) {
Florent Castelli01343032021-11-03 16:09:46 +0100309 // Queue is full
310 return false;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700311 }
312 return true;
313 }
314
315 SendDataMessage(buffer, true);
316
317 // Always return true for SCTP DataChannel per the spec.
318 return true;
319}
320
321void SctpDataChannel::SetSctpSid(int sid) {
322 RTC_DCHECK_RUN_ON(signaling_thread_);
323 RTC_DCHECK_LT(config_.id, 0);
324 RTC_DCHECK_GE(sid, 0);
325 RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
326 RTC_DCHECK_EQ(state_, kConnecting);
327
328 if (config_.id == sid) {
329 return;
330 }
331
332 const_cast<InternalDataChannelInit&>(config_).id = sid;
333 provider_->AddSctpDataStream(sid);
334}
335
336void SctpDataChannel::OnClosingProcedureStartedRemotely(int sid) {
337 RTC_DCHECK_RUN_ON(signaling_thread_);
338 if (sid == config_.id && state_ != kClosing && state_ != kClosed) {
339 // Don't bother sending queued data since the side that initiated the
340 // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
341 // discussion about this.
342 queued_send_data_.Clear();
343 queued_control_data_.Clear();
344 // Just need to change state to kClosing, SctpTransport will handle the
345 // rest of the closing procedure and OnClosingProcedureComplete will be
346 // called later.
347 started_closing_procedure_ = true;
348 SetState(kClosing);
349 }
350}
351
352void SctpDataChannel::OnClosingProcedureComplete(int sid) {
353 RTC_DCHECK_RUN_ON(signaling_thread_);
354 if (sid == config_.id) {
355 // If the closing procedure is complete, we should have finished sending
356 // all pending data and transitioned to kClosing already.
357 RTC_DCHECK_EQ(state_, kClosing);
358 RTC_DCHECK(queued_send_data_.Empty());
359 DisconnectFromProvider();
360 SetState(kClosed);
361 }
362}
363
364void SctpDataChannel::OnTransportChannelCreated() {
365 RTC_DCHECK_RUN_ON(signaling_thread_);
366 if (!connected_to_provider_) {
367 connected_to_provider_ = provider_->ConnectDataChannel(this);
368 }
369 // The sid may have been unassigned when provider_->ConnectDataChannel was
370 // done. So always add the streams even if connected_to_provider_ is true.
371 if (config_.id >= 0) {
372 provider_->AddSctpDataStream(config_.id);
373 }
374}
375
Florent Castellidcb9ffc2021-06-29 14:58:23 +0200376void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
377 // The SctpTransport is unusable, which could come from multiplie reasons:
378 // - the SCTP m= section was rejected
379 // - the DTLS transport is closed
380 // - the SCTP transport is closed
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700381 CloseAbruptlyWithError(std::move(error));
382}
383
384DataChannelStats SctpDataChannel::GetStats() const {
385 RTC_DCHECK_RUN_ON(signaling_thread_);
386 DataChannelStats stats{internal_id_, id(), label(),
387 protocol(), state(), messages_sent(),
388 messages_received(), bytes_sent(), bytes_received()};
389 return stats;
390}
391
392void SctpDataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
393 const rtc::CopyOnWriteBuffer& payload) {
394 RTC_DCHECK_RUN_ON(signaling_thread_);
395 if (params.sid != config_.id) {
396 return;
397 }
398
Florent Castellid95b1492021-05-10 11:29:56 +0200399 if (params.type == DataMessageType::kControl) {
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700400 if (handshake_state_ != kHandshakeWaitingForAck) {
401 // Ignore it if we are not expecting an ACK message.
402 RTC_LOG(LS_WARNING)
403 << "DataChannel received unexpected CONTROL message, sid = "
404 << params.sid;
405 return;
406 }
407 if (ParseDataChannelOpenAckMessage(payload)) {
408 // We can send unordered as soon as we receive the ACK message.
409 handshake_state_ = kHandshakeReady;
410 RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
411 << params.sid;
412 } else {
413 RTC_LOG(LS_WARNING)
414 << "DataChannel failed to parse OPEN_ACK message, sid = "
415 << params.sid;
416 }
417 return;
418 }
419
Florent Castellid95b1492021-05-10 11:29:56 +0200420 RTC_DCHECK(params.type == DataMessageType::kBinary ||
421 params.type == DataMessageType::kText);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700422
423 RTC_LOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
424 << params.sid;
425 // We can send unordered as soon as we receive any DATA message since the
426 // remote side must have received the OPEN (and old clients do not send
427 // OPEN_ACK).
428 if (handshake_state_ == kHandshakeWaitingForAck) {
429 handshake_state_ = kHandshakeReady;
430 }
431
Florent Castellid95b1492021-05-10 11:29:56 +0200432 bool binary = (params.type == webrtc::DataMessageType::kBinary);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700433 auto buffer = std::make_unique<DataBuffer>(payload, binary);
434 if (state_ == kOpen && observer_) {
435 ++messages_received_;
436 bytes_received_ += buffer->size();
437 observer_->OnMessage(*buffer.get());
438 } else {
439 if (queued_received_data_.byte_count() + payload.size() >
440 kMaxQueuedReceivedDataBytes) {
441 RTC_LOG(LS_ERROR) << "Queued received data exceeds the max buffer size.";
442
443 queued_received_data_.Clear();
444 CloseAbruptlyWithError(
445 RTCError(RTCErrorType::RESOURCE_EXHAUSTED,
446 "Queued received data exceeds the max buffer size."));
447
448 return;
449 }
450 queued_received_data_.PushBack(std::move(buffer));
451 }
452}
453
454void SctpDataChannel::OnTransportReady(bool writable) {
455 RTC_DCHECK_RUN_ON(signaling_thread_);
456
457 writable_ = writable;
458 if (!writable) {
459 return;
460 }
461
462 SendQueuedControlMessages();
463 SendQueuedDataMessages();
464
465 UpdateState();
466}
467
468void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
469 RTC_DCHECK_RUN_ON(signaling_thread_);
470
471 if (state_ == kClosed) {
472 return;
473 }
474
475 if (connected_to_provider_) {
476 DisconnectFromProvider();
477 }
478
479 // Closing abruptly means any queued data gets thrown away.
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700480 queued_send_data_.Clear();
481 queued_control_data_.Clear();
482
483 // Still go to "kClosing" before "kClosed", since observers may be expecting
484 // that.
485 SetState(kClosing);
486 error_ = std::move(error);
487 SetState(kClosed);
488}
489
490void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
491 const std::string& message) {
492 RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
493 error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
494 CloseAbruptlyWithError(std::move(error));
495}
496
497void SctpDataChannel::UpdateState() {
498 RTC_DCHECK_RUN_ON(signaling_thread_);
499 // UpdateState determines what to do from a few state variables. Include
500 // all conditions required for each state transition here for
501 // clarity. OnTransportReady(true) will send any queued data and then invoke
502 // UpdateState().
503
504 switch (state_) {
505 case kConnecting: {
506 if (connected_to_provider_) {
507 if (handshake_state_ == kHandshakeShouldSendOpen) {
508 rtc::CopyOnWriteBuffer payload;
509 WriteDataChannelOpenMessage(label_, config_, &payload);
510 SendControlMessage(payload);
511 } else if (handshake_state_ == kHandshakeShouldSendAck) {
512 rtc::CopyOnWriteBuffer payload;
513 WriteDataChannelOpenAckMessage(&payload);
514 SendControlMessage(payload);
515 }
516 if (writable_ && (handshake_state_ == kHandshakeReady ||
517 handshake_state_ == kHandshakeWaitingForAck)) {
518 SetState(kOpen);
519 // If we have received buffers before the channel got writable.
520 // Deliver them now.
521 DeliverQueuedReceivedData();
522 }
523 }
524 break;
525 }
526 case kOpen: {
527 break;
528 }
529 case kClosing: {
530 // Wait for all queued data to be sent before beginning the closing
531 // procedure.
532 if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
533 // For SCTP data channels, we need to wait for the closing procedure
534 // to complete; after calling RemoveSctpDataStream,
535 // OnClosingProcedureComplete will end up called asynchronously
536 // afterwards.
537 if (connected_to_provider_ && !started_closing_procedure_ &&
538 config_.id >= 0) {
539 started_closing_procedure_ = true;
540 provider_->RemoveSctpDataStream(config_.id);
541 }
542 }
543 break;
544 }
545 case kClosed:
546 break;
547 }
548}
549
550void SctpDataChannel::SetState(DataState state) {
551 RTC_DCHECK_RUN_ON(signaling_thread_);
552 if (state_ == state) {
553 return;
554 }
555
556 state_ = state;
557 if (observer_) {
558 observer_->OnStateChange();
559 }
560 if (state_ == kOpen) {
561 SignalOpened(this);
562 } else if (state_ == kClosed) {
563 SignalClosed(this);
564 }
565}
566
567void SctpDataChannel::DisconnectFromProvider() {
568 RTC_DCHECK_RUN_ON(signaling_thread_);
569 if (!connected_to_provider_)
570 return;
571
572 provider_->DisconnectDataChannel(this);
573 connected_to_provider_ = false;
574}
575
576void SctpDataChannel::DeliverQueuedReceivedData() {
577 RTC_DCHECK_RUN_ON(signaling_thread_);
578 if (!observer_) {
579 return;
580 }
581
582 while (!queued_received_data_.Empty()) {
583 std::unique_ptr<DataBuffer> buffer = queued_received_data_.PopFront();
584 ++messages_received_;
585 bytes_received_ += buffer->size();
586 observer_->OnMessage(*buffer);
587 }
588}
589
590void SctpDataChannel::SendQueuedDataMessages() {
591 RTC_DCHECK_RUN_ON(signaling_thread_);
592 if (queued_send_data_.Empty()) {
593 return;
594 }
595
596 RTC_DCHECK(state_ == kOpen || state_ == kClosing);
597
598 while (!queued_send_data_.Empty()) {
599 std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
600 if (!SendDataMessage(*buffer, false)) {
601 // Return the message to the front of the queue if sending is aborted.
602 queued_send_data_.PushFront(std::move(buffer));
603 break;
604 }
605 }
606}
607
608bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
609 bool queue_if_blocked) {
610 RTC_DCHECK_RUN_ON(signaling_thread_);
Florent Castellid95b1492021-05-10 11:29:56 +0200611 SendDataParams send_params;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700612
613 send_params.ordered = config_.ordered;
614 // Send as ordered if it is still going through OPEN/ACK signaling.
615 if (handshake_state_ != kHandshakeReady && !config_.ordered) {
616 send_params.ordered = true;
617 RTC_LOG(LS_VERBOSE)
618 << "Sending data as ordered for unordered DataChannel "
619 "because the OPEN_ACK message has not been received.";
620 }
621
Florent Castelli5183f002021-05-07 13:52:44 +0200622 send_params.max_rtx_count = config_.maxRetransmits;
623 send_params.max_rtx_ms = config_.maxRetransmitTime;
Florent Castellid95b1492021-05-10 11:29:56 +0200624 send_params.type =
625 buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700626
627 cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
Florent Castellid95b1492021-05-10 11:29:56 +0200628 bool success =
629 provider_->SendData(config_.id, send_params, buffer.data, &send_result);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700630
631 if (success) {
632 ++messages_sent_;
633 bytes_sent_ += buffer.size();
634
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700635 if (observer_ && buffer.size() > 0) {
636 observer_->OnBufferedAmountChange(buffer.size());
637 }
638 return true;
639 }
640
641 if (send_result == cricket::SDR_BLOCK) {
642 if (!queue_if_blocked || QueueSendDataMessage(buffer)) {
643 return false;
644 }
645 }
646 // Close the channel if the error is not SDR_BLOCK, or if queuing the
647 // message failed.
648 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
649 "send_result = "
650 << send_result;
651 CloseAbruptlyWithError(
652 RTCError(RTCErrorType::NETWORK_ERROR, "Failure to send data"));
653
654 return false;
655}
656
657bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
658 RTC_DCHECK_RUN_ON(signaling_thread_);
659 size_t start_buffered_amount = queued_send_data_.byte_count();
Florent Castellia563a2a2021-10-18 11:46:21 +0200660 if (start_buffered_amount + buffer.size() >
661 DataChannelInterface::MaxSendQueueSize()) {
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700662 RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
663 return false;
664 }
665 queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
666 return true;
667}
668
669void SctpDataChannel::SendQueuedControlMessages() {
670 RTC_DCHECK_RUN_ON(signaling_thread_);
671 PacketQueue control_packets;
672 control_packets.Swap(&queued_control_data_);
673
674 while (!control_packets.Empty()) {
675 std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
676 SendControlMessage(buf->data);
677 }
678}
679
680void SctpDataChannel::QueueControlMessage(
681 const rtc::CopyOnWriteBuffer& buffer) {
682 RTC_DCHECK_RUN_ON(signaling_thread_);
683 queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
684}
685
686bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
687 RTC_DCHECK_RUN_ON(signaling_thread_);
688 RTC_DCHECK(writable_);
689 RTC_DCHECK_GE(config_.id, 0);
690
691 bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
692 RTC_DCHECK(!is_open_message || !config_.negotiated);
693
Florent Castellid95b1492021-05-10 11:29:56 +0200694 SendDataParams send_params;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700695 // Send data as ordered before we receive any message from the remote peer to
696 // make sure the remote peer will not receive any data before it receives the
697 // OPEN message.
698 send_params.ordered = config_.ordered || is_open_message;
Florent Castellid95b1492021-05-10 11:29:56 +0200699 send_params.type = DataMessageType::kControl;
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700700
701 cricket::SendDataResult send_result = cricket::SDR_SUCCESS;
Florent Castellid95b1492021-05-10 11:29:56 +0200702 bool retval =
703 provider_->SendData(config_.id, send_params, buffer, &send_result);
Taylor Brandstetter3a034e12020-07-09 15:32:34 -0700704 if (retval) {
705 RTC_LOG(LS_VERBOSE) << "Sent CONTROL message on channel " << config_.id;
706
707 if (handshake_state_ == kHandshakeShouldSendAck) {
708 handshake_state_ = kHandshakeReady;
709 } else if (handshake_state_ == kHandshakeShouldSendOpen) {
710 handshake_state_ = kHandshakeWaitingForAck;
711 }
712 } else if (send_result == cricket::SDR_BLOCK) {
713 QueueControlMessage(buffer);
714 } else {
715 RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
716 " the CONTROL message, send_result = "
717 << send_result;
718 CloseAbruptlyWithError(RTCError(RTCErrorType::NETWORK_ERROR,
719 "Failed to send a CONTROL message"));
720 }
721 return retval;
722}
723
724// static
725void SctpDataChannel::ResetInternalIdAllocatorForTesting(int new_value) {
726 g_unique_id = new_value;
727}
728
729} // namespace webrtc