Refactoring DataChannelController from PeerConnection part 4
This CL:
- Moved HasDataChannel and data_channel_type_
- Moved rtp_data_channels_
- Moved sctp_data_channels_
- Moved data_channel_controller to its own .h file
- Various changes to reduce the coupling between the classes
- Removed friendship between DataChannelController and PeerConnection
Bug: webrtc:11146
Change-Id: Ib8c395e4c90ce34baf40812d1dade0ffa79f2438
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161094
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29987}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 484886b..cc9f149 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -8,22 +8,23 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-// This file contains the implementation of the class
-// webrtc::PeerConnection::DataChannelController.
-//
-// The intent is that this should be webrtc::DataChannelController, but
-// as a migration stage, it is simpler to have it as an inner class,
-// declared in the header file pc/peer_connection.h
+#include "pc/data_channel_controller.h"
+
+#include <utility>
#include "pc/peer_connection.h"
#include "pc/sctp_utils.h"
namespace webrtc {
-bool PeerConnection::DataChannelController::SendData(
- const cricket::SendDataParams& params,
- const rtc::CopyOnWriteBuffer& payload,
- cricket::SendDataResult* result) {
+bool DataChannelController::HasDataChannels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
+}
+
+bool DataChannelController::SendData(const cricket::SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result) {
// RTC_DCHECK_RUN_ON(signaling_thread());
if (data_channel_transport()) {
SendDataParams send_params;
@@ -59,7 +60,7 @@
return false;
}
-bool PeerConnection::DataChannelController::ConnectDataChannel(
+bool DataChannelController::ConnectDataChannel(
DataChannel* webrtc_data_channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!rtp_data_channel() && !data_channel_transport()) {
@@ -87,7 +88,7 @@
return true;
}
-void PeerConnection::DataChannelController::DisconnectDataChannel(
+void DataChannelController::DisconnectDataChannel(
DataChannel* webrtc_data_channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!rtp_data_channel() && !data_channel_transport()) {
@@ -108,7 +109,7 @@
}
}
-void PeerConnection::DataChannelController::AddSctpDataStream(int sid) {
+void DataChannelController::AddSctpDataStream(int sid) {
if (data_channel_transport()) {
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
if (data_channel_transport()) {
@@ -118,7 +119,7 @@
}
}
-void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) {
+void DataChannelController::RemoveSctpDataStream(int sid) {
if (data_channel_transport()) {
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
if (data_channel_transport()) {
@@ -128,13 +129,13 @@
}
}
-bool PeerConnection::DataChannelController::ReadyToSendData() const {
+bool DataChannelController::ReadyToSendData() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
(data_channel_transport() && data_channel_transport_ready_to_send_);
}
-void PeerConnection::DataChannelController::OnDataReceived(
+void DataChannelController::OnDataReceived(
int channel_id,
DataMessageType type,
const rtc::CopyOnWriteBuffer& buffer) {
@@ -151,7 +152,7 @@
});
}
-void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) {
+void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@@ -160,7 +161,7 @@
});
}
-void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) {
+void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@@ -169,7 +170,7 @@
});
}
-void PeerConnection::DataChannelController::OnReadyToSend() {
+void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
@@ -180,12 +181,12 @@
});
}
-void PeerConnection::DataChannelController::SetupDataChannelTransport_n() {
+void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
}
-void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() {
+void DataChannelController::TeardownDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_ = nullptr;
if (data_channel_transport()) {
@@ -194,7 +195,7 @@
set_data_channel_transport(nullptr);
}
-void PeerConnection::DataChannelController::OnTransportChanged(
+void DataChannelController::OnTransportChanged(
DataChannelTransportInterface* new_data_channel_transport) {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport() &&
@@ -211,8 +212,8 @@
// necessary when bundling is applied.
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
- RTC_DCHECK_RUN_ON(pc_->signaling_thread());
- for (auto channel : pc_->sctp_data_channels_) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ for (auto channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
});
@@ -220,7 +221,7 @@
}
}
-bool PeerConnection::DataChannelController::HandleOpenMessage_s(
+bool DataChannelController::HandleOpenMessage_s(
const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& buffer) {
if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
@@ -241,7 +242,7 @@
return false;
}
-void PeerConnection::DataChannelController::OnDataChannelOpenMessage(
+void DataChannelController::OnDataChannelOpenMessage(
const std::string& label,
const InternalDataChannelInit& config) {
rtc::scoped_refptr<DataChannel> channel(
@@ -253,29 +254,26 @@
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
- {
- RTC_DCHECK_RUN_ON(pc_->signaling_thread());
- pc_->Observer()->OnDataChannel(std::move(proxy_channel));
- pc_->NoteUsageEvent(UsageEvent::DATA_ADDED);
- }
+ pc_->Observer()->OnDataChannel(std::move(proxy_channel));
+ pc_->NoteDataAddedEvent();
}
rtc::scoped_refptr<DataChannel>
-PeerConnection::DataChannelController::InternalCreateDataChannel(
+DataChannelController::InternalCreateDataChannel(
const std::string& label,
const InternalDataChannelInit* config) {
- RTC_DCHECK_RUN_ON(pc_->signaling_thread());
+ RTC_DCHECK_RUN_ON(signaling_thread());
if (pc_->IsClosed()) {
return nullptr;
}
- if (pc_->data_channel_type() == cricket::DCT_NONE) {
+ if (data_channel_type_ == cricket::DCT_NONE) {
RTC_LOG(LS_ERROR)
<< "InternalCreateDataChannel: Data is not supported in this call.";
return nullptr;
}
InternalDataChannelInit new_config =
config ? (*config) : InternalDataChannelInit();
- if (DataChannel::IsSctpLike(pc_->data_channel_type_)) {
+ if (DataChannel::IsSctpLike(data_channel_type_)) {
if (new_config.id < 0) {
rtc::SSLRole role;
if ((pc_->GetSctpSslRole(&role)) &&
@@ -292,36 +290,33 @@
}
rtc::scoped_refptr<DataChannel> channel(
- DataChannel::Create(this, pc_->data_channel_type(), label, new_config));
+ DataChannel::Create(this, data_channel_type(), label, new_config));
if (!channel) {
sid_allocator_.ReleaseSid(new_config.id);
return nullptr;
}
if (channel->data_channel_type() == cricket::DCT_RTP) {
- if (pc_->rtp_data_channels_.find(channel->label()) !=
- pc_->rtp_data_channels_.end()) {
+ if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
<< " already exists.";
return nullptr;
}
- pc_->rtp_data_channels_[channel->label()] = channel;
+ rtp_data_channels_[channel->label()] = channel;
} else {
- RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_));
- pc_->sctp_data_channels_.push_back(channel);
+ RTC_DCHECK(DataChannel::IsSctpLike(data_channel_type_));
+ sctp_data_channels_.push_back(channel);
channel->SignalClosed.connect(pc_,
&PeerConnection::OnSctpDataChannelClosed);
}
-
- pc_->SignalDataChannelCreated_(channel.get());
+ SignalDataChannelCreated_(channel.get());
return channel;
}
-void PeerConnection::DataChannelController::AllocateSctpSids(
- rtc::SSLRole role) {
- RTC_DCHECK_RUN_ON(pc_->signaling_thread());
+void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<rtc::scoped_refptr<DataChannel>> channels_to_close;
- for (const auto& channel : pc_->sctp_data_channels_) {
+ for (const auto& channel : sctp_data_channels_) {
if (channel->id() < 0) {
int sid;
if (!sid_allocator_.AllocateSid(role, &sid)) {
@@ -339,11 +334,10 @@
}
}
-void PeerConnection::DataChannelController::OnSctpDataChannelClosed(
- DataChannel* channel) {
- RTC_DCHECK_RUN_ON(pc_->signaling_thread());
- for (auto it = pc_->sctp_data_channels_.begin();
- it != pc_->sctp_data_channels_.end(); ++it) {
+void DataChannelController::OnSctpDataChannelClosed(DataChannel* channel) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+ ++it) {
if (it->get() == channel) {
if (channel->id() >= 0) {
// After the closing procedure is done, it's safe to use this ID for
@@ -352,12 +346,138 @@
}
// Since this method is triggered by a signal from the DataChannel,
// we can't free it directly here; we need to free it asynchronously.
- pc_->sctp_data_channels_to_free_.push_back(*it);
- pc_->sctp_data_channels_.erase(it);
+ sctp_data_channels_to_free_.push_back(*it);
+ sctp_data_channels_.erase(it);
pc_->SignalFreeDataChannels();
return;
}
}
}
+void DataChannelController::OnTransportChannelClosed() {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // Use a temporary copy of the RTP/SCTP DataChannel list because the
+ // DataChannel may callback to us and try to modify the list.
+ std::map<std::string, rtc::scoped_refptr<DataChannel>> temp_rtp_dcs;
+ temp_rtp_dcs.swap(rtp_data_channels_);
+ for (const auto& kv : temp_rtp_dcs) {
+ kv.second->OnTransportChannelClosed();
+ }
+
+ std::vector<rtc::scoped_refptr<DataChannel>> temp_sctp_dcs;
+ temp_sctp_dcs.swap(sctp_data_channels_);
+ for (const auto& channel : temp_sctp_dcs) {
+ channel->OnTransportChannelClosed();
+ }
+}
+
+DataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ for (const auto& channel : sctp_data_channels_) {
+ if (channel->id() == sid) {
+ return channel;
+ }
+ }
+ return nullptr;
+}
+
+void DataChannelController::UpdateLocalRtpDataChannels(
+ const cricket::StreamParamsVec& streams) {
+ std::vector<std::string> existing_channels;
+
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // Find new and active data channels.
+ for (const cricket::StreamParams& params : streams) {
+ // |it->sync_label| is actually the data channel label. The reason is that
+ // we use the same naming of data channels as we do for
+ // MediaStreams and Tracks.
+ // For MediaStreams, the sync_label is the MediaStream label and the
+ // track label is the same as |streamid|.
+ const std::string& channel_label = params.first_stream_id();
+ auto data_channel_it = rtp_data_channels()->find(channel_label);
+ if (data_channel_it == rtp_data_channels()->end()) {
+ RTC_LOG(LS_ERROR) << "channel label not found";
+ continue;
+ }
+ // Set the SSRC the data channel should use for sending.
+ data_channel_it->second->SetSendSsrc(params.first_ssrc());
+ existing_channels.push_back(data_channel_it->first);
+ }
+
+ UpdateClosingRtpDataChannels(existing_channels, true);
+}
+
+void DataChannelController::UpdateRemoteRtpDataChannels(
+ const cricket::StreamParamsVec& streams) {
+ std::vector<std::string> existing_channels;
+
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // Find new and active data channels.
+ for (const cricket::StreamParams& params : streams) {
+ // The data channel label is either the mslabel or the SSRC if the mslabel
+ // does not exist. Ex a=ssrc:444330170 mslabel:test1.
+ std::string label = params.first_stream_id().empty()
+ ? rtc::ToString(params.first_ssrc())
+ : params.first_stream_id();
+ auto data_channel_it = rtp_data_channels()->find(label);
+ if (data_channel_it == rtp_data_channels()->end()) {
+ // This is a new data channel.
+ CreateRemoteRtpDataChannel(label, params.first_ssrc());
+ } else {
+ data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
+ }
+ existing_channels.push_back(label);
+ }
+
+ UpdateClosingRtpDataChannels(existing_channels, false);
+}
+
+void DataChannelController::UpdateClosingRtpDataChannels(
+ const std::vector<std::string>& active_channels,
+ bool is_local_update) {
+ auto it = rtp_data_channels_.begin();
+ while (it != rtp_data_channels_.end()) {
+ DataChannel* data_channel = it->second;
+ if (absl::c_linear_search(active_channels, data_channel->label())) {
+ ++it;
+ continue;
+ }
+
+ if (is_local_update) {
+ data_channel->SetSendSsrc(0);
+ } else {
+ data_channel->RemotePeerRequestClose();
+ }
+
+ if (data_channel->state() == DataChannel::kClosed) {
+ rtp_data_channels_.erase(it);
+ it = rtp_data_channels_.begin();
+ } else {
+ ++it;
+ }
+ }
+}
+
+void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
+ uint32_t remote_ssrc) {
+ rtc::scoped_refptr<DataChannel> channel(
+ InternalCreateDataChannel(label, nullptr));
+ if (!channel.get()) {
+ RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
+ "CreateDataChannel failed.";
+ return;
+ }
+ channel->SetReceiveSsrc(remote_ssrc);
+ rtc::scoped_refptr<DataChannelInterface> proxy_channel =
+ DataChannelProxy::Create(signaling_thread(), channel);
+ pc_->Observer()->OnDataChannel(std::move(proxy_channel));
+}
+
+rtc::Thread* DataChannelController::network_thread() const {
+ return pc_->network_thread();
+}
+rtc::Thread* DataChannelController::signaling_thread() const {
+ return pc_->signaling_thread();
+}
+
} // namespace webrtc