Refactoring DataChannelController from PeerConnection part 4

This CL:
- Moved HasDataChannel and data_channel_type_
- Moved rtp_data_channels_
- Moved sctp_data_channels_
- Moved data_channel_controller to its own .h file
- Various changes to reduce the coupling between the classes
- Removed friendship between DataChannelController and PeerConnection

Bug: webrtc:11146
Change-Id: Ib8c395e4c90ce34baf40812d1dade0ffa79f2438
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161094
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29987}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 484886b..cc9f149 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -8,22 +8,23 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
-// This file contains the implementation of the class
-// webrtc::PeerConnection::DataChannelController.
-//
-// The intent is that this should be webrtc::DataChannelController, but
-// as a migration stage, it is simpler to have it as an inner class,
-// declared in the header file pc/peer_connection.h
+#include "pc/data_channel_controller.h"
+
+#include <utility>
 
 #include "pc/peer_connection.h"
 #include "pc/sctp_utils.h"
 
 namespace webrtc {
 
-bool PeerConnection::DataChannelController::SendData(
-    const cricket::SendDataParams& params,
-    const rtc::CopyOnWriteBuffer& payload,
-    cricket::SendDataResult* result) {
+bool DataChannelController::HasDataChannels() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
+}
+
+bool DataChannelController::SendData(const cricket::SendDataParams& params,
+                                     const rtc::CopyOnWriteBuffer& payload,
+                                     cricket::SendDataResult* result) {
   // RTC_DCHECK_RUN_ON(signaling_thread());
   if (data_channel_transport()) {
     SendDataParams send_params;
@@ -59,7 +60,7 @@
   return false;
 }
 
-bool PeerConnection::DataChannelController::ConnectDataChannel(
+bool DataChannelController::ConnectDataChannel(
     DataChannel* webrtc_data_channel) {
   RTC_DCHECK_RUN_ON(signaling_thread());
   if (!rtp_data_channel() && !data_channel_transport()) {
@@ -87,7 +88,7 @@
   return true;
 }
 
-void PeerConnection::DataChannelController::DisconnectDataChannel(
+void DataChannelController::DisconnectDataChannel(
     DataChannel* webrtc_data_channel) {
   RTC_DCHECK_RUN_ON(signaling_thread());
   if (!rtp_data_channel() && !data_channel_transport()) {
@@ -108,7 +109,7 @@
   }
 }
 
-void PeerConnection::DataChannelController::AddSctpDataStream(int sid) {
+void DataChannelController::AddSctpDataStream(int sid) {
   if (data_channel_transport()) {
     network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
       if (data_channel_transport()) {
@@ -118,7 +119,7 @@
   }
 }
 
-void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) {
+void DataChannelController::RemoveSctpDataStream(int sid) {
   if (data_channel_transport()) {
     network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
       if (data_channel_transport()) {
@@ -128,13 +129,13 @@
   }
 }
 
-bool PeerConnection::DataChannelController::ReadyToSendData() const {
+bool DataChannelController::ReadyToSendData() const {
   RTC_DCHECK_RUN_ON(signaling_thread());
   return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
          (data_channel_transport() && data_channel_transport_ready_to_send_);
 }
 
-void PeerConnection::DataChannelController::OnDataReceived(
+void DataChannelController::OnDataReceived(
     int channel_id,
     DataMessageType type,
     const rtc::CopyOnWriteBuffer& buffer) {
@@ -151,7 +152,7 @@
       });
 }
 
-void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) {
+void DataChannelController::OnChannelClosing(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@@ -160,7 +161,7 @@
       });
 }
 
-void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) {
+void DataChannelController::OnChannelClosed(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@@ -169,7 +170,7 @@
       });
 }
 
-void PeerConnection::DataChannelController::OnReadyToSend() {
+void DataChannelController::OnReadyToSend() {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(), [this] {
@@ -180,12 +181,12 @@
       });
 }
 
-void PeerConnection::DataChannelController::SetupDataChannelTransport_n() {
+void DataChannelController::SetupDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
 }
 
-void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() {
+void DataChannelController::TeardownDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
   data_channel_transport_invoker_ = nullptr;
   if (data_channel_transport()) {
@@ -194,7 +195,7 @@
   set_data_channel_transport(nullptr);
 }
 
-void PeerConnection::DataChannelController::OnTransportChanged(
+void DataChannelController::OnTransportChanged(
     DataChannelTransportInterface* new_data_channel_transport) {
   RTC_DCHECK_RUN_ON(network_thread());
   if (data_channel_transport() &&
@@ -211,8 +212,8 @@
       // necessary when bundling is applied.
       data_channel_transport_invoker_->AsyncInvoke<void>(
           RTC_FROM_HERE, signaling_thread(), [this] {
-            RTC_DCHECK_RUN_ON(pc_->signaling_thread());
-            for (auto channel : pc_->sctp_data_channels_) {
+            RTC_DCHECK_RUN_ON(signaling_thread());
+            for (auto channel : sctp_data_channels_) {
               channel->OnTransportChannelCreated();
             }
           });
@@ -220,7 +221,7 @@
   }
 }
 
-bool PeerConnection::DataChannelController::HandleOpenMessage_s(
+bool DataChannelController::HandleOpenMessage_s(
     const cricket::ReceiveDataParams& params,
     const rtc::CopyOnWriteBuffer& buffer) {
   if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
@@ -241,7 +242,7 @@
   return false;
 }
 
-void PeerConnection::DataChannelController::OnDataChannelOpenMessage(
+void DataChannelController::OnDataChannelOpenMessage(
     const std::string& label,
     const InternalDataChannelInit& config) {
   rtc::scoped_refptr<DataChannel> channel(
@@ -253,29 +254,26 @@
 
   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
       DataChannelProxy::Create(signaling_thread(), channel);
-  {
-    RTC_DCHECK_RUN_ON(pc_->signaling_thread());
-    pc_->Observer()->OnDataChannel(std::move(proxy_channel));
-    pc_->NoteUsageEvent(UsageEvent::DATA_ADDED);
-  }
+  pc_->Observer()->OnDataChannel(std::move(proxy_channel));
+  pc_->NoteDataAddedEvent();
 }
 
 rtc::scoped_refptr<DataChannel>
-PeerConnection::DataChannelController::InternalCreateDataChannel(
+DataChannelController::InternalCreateDataChannel(
     const std::string& label,
     const InternalDataChannelInit* config) {
-  RTC_DCHECK_RUN_ON(pc_->signaling_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   if (pc_->IsClosed()) {
     return nullptr;
   }
-  if (pc_->data_channel_type() == cricket::DCT_NONE) {
+  if (data_channel_type_ == cricket::DCT_NONE) {
     RTC_LOG(LS_ERROR)
         << "InternalCreateDataChannel: Data is not supported in this call.";
     return nullptr;
   }
   InternalDataChannelInit new_config =
       config ? (*config) : InternalDataChannelInit();
-  if (DataChannel::IsSctpLike(pc_->data_channel_type_)) {
+  if (DataChannel::IsSctpLike(data_channel_type_)) {
     if (new_config.id < 0) {
       rtc::SSLRole role;
       if ((pc_->GetSctpSslRole(&role)) &&
@@ -292,36 +290,33 @@
   }
 
   rtc::scoped_refptr<DataChannel> channel(
-      DataChannel::Create(this, pc_->data_channel_type(), label, new_config));
+      DataChannel::Create(this, data_channel_type(), label, new_config));
   if (!channel) {
     sid_allocator_.ReleaseSid(new_config.id);
     return nullptr;
   }
 
   if (channel->data_channel_type() == cricket::DCT_RTP) {
-    if (pc_->rtp_data_channels_.find(channel->label()) !=
-        pc_->rtp_data_channels_.end()) {
+    if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
       RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
                         << " already exists.";
       return nullptr;
     }
-    pc_->rtp_data_channels_[channel->label()] = channel;
+    rtp_data_channels_[channel->label()] = channel;
   } else {
-    RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_));
-    pc_->sctp_data_channels_.push_back(channel);
+    RTC_DCHECK(DataChannel::IsSctpLike(data_channel_type_));
+    sctp_data_channels_.push_back(channel);
     channel->SignalClosed.connect(pc_,
                                   &PeerConnection::OnSctpDataChannelClosed);
   }
-
-  pc_->SignalDataChannelCreated_(channel.get());
+  SignalDataChannelCreated_(channel.get());
   return channel;
 }
 
-void PeerConnection::DataChannelController::AllocateSctpSids(
-    rtc::SSLRole role) {
-  RTC_DCHECK_RUN_ON(pc_->signaling_thread());
+void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
   std::vector<rtc::scoped_refptr<DataChannel>> channels_to_close;
-  for (const auto& channel : pc_->sctp_data_channels_) {
+  for (const auto& channel : sctp_data_channels_) {
     if (channel->id() < 0) {
       int sid;
       if (!sid_allocator_.AllocateSid(role, &sid)) {
@@ -339,11 +334,10 @@
   }
 }
 
-void PeerConnection::DataChannelController::OnSctpDataChannelClosed(
-    DataChannel* channel) {
-  RTC_DCHECK_RUN_ON(pc_->signaling_thread());
-  for (auto it = pc_->sctp_data_channels_.begin();
-       it != pc_->sctp_data_channels_.end(); ++it) {
+void DataChannelController::OnSctpDataChannelClosed(DataChannel* channel) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+       ++it) {
     if (it->get() == channel) {
       if (channel->id() >= 0) {
         // After the closing procedure is done, it's safe to use this ID for
@@ -352,12 +346,138 @@
       }
       // Since this method is triggered by a signal from the DataChannel,
       // we can't free it directly here; we need to free it asynchronously.
-      pc_->sctp_data_channels_to_free_.push_back(*it);
-      pc_->sctp_data_channels_.erase(it);
+      sctp_data_channels_to_free_.push_back(*it);
+      sctp_data_channels_.erase(it);
       pc_->SignalFreeDataChannels();
       return;
     }
   }
 }
 
+void DataChannelController::OnTransportChannelClosed() {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  // Use a temporary copy of the RTP/SCTP DataChannel list because the
+  // DataChannel may callback to us and try to modify the list.
+  std::map<std::string, rtc::scoped_refptr<DataChannel>> temp_rtp_dcs;
+  temp_rtp_dcs.swap(rtp_data_channels_);
+  for (const auto& kv : temp_rtp_dcs) {
+    kv.second->OnTransportChannelClosed();
+  }
+
+  std::vector<rtc::scoped_refptr<DataChannel>> temp_sctp_dcs;
+  temp_sctp_dcs.swap(sctp_data_channels_);
+  for (const auto& channel : temp_sctp_dcs) {
+    channel->OnTransportChannelClosed();
+  }
+}
+
+DataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  for (const auto& channel : sctp_data_channels_) {
+    if (channel->id() == sid) {
+      return channel;
+    }
+  }
+  return nullptr;
+}
+
+void DataChannelController::UpdateLocalRtpDataChannels(
+    const cricket::StreamParamsVec& streams) {
+  std::vector<std::string> existing_channels;
+
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  // Find new and active data channels.
+  for (const cricket::StreamParams& params : streams) {
+    // |it->sync_label| is actually the data channel label. The reason is that
+    // we use the same naming of data channels as we do for
+    // MediaStreams and Tracks.
+    // For MediaStreams, the sync_label is the MediaStream label and the
+    // track label is the same as |streamid|.
+    const std::string& channel_label = params.first_stream_id();
+    auto data_channel_it = rtp_data_channels()->find(channel_label);
+    if (data_channel_it == rtp_data_channels()->end()) {
+      RTC_LOG(LS_ERROR) << "channel label not found";
+      continue;
+    }
+    // Set the SSRC the data channel should use for sending.
+    data_channel_it->second->SetSendSsrc(params.first_ssrc());
+    existing_channels.push_back(data_channel_it->first);
+  }
+
+  UpdateClosingRtpDataChannels(existing_channels, true);
+}
+
+void DataChannelController::UpdateRemoteRtpDataChannels(
+    const cricket::StreamParamsVec& streams) {
+  std::vector<std::string> existing_channels;
+
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  // Find new and active data channels.
+  for (const cricket::StreamParams& params : streams) {
+    // The data channel label is either the mslabel or the SSRC if the mslabel
+    // does not exist. Ex a=ssrc:444330170 mslabel:test1.
+    std::string label = params.first_stream_id().empty()
+                            ? rtc::ToString(params.first_ssrc())
+                            : params.first_stream_id();
+    auto data_channel_it = rtp_data_channels()->find(label);
+    if (data_channel_it == rtp_data_channels()->end()) {
+      // This is a new data channel.
+      CreateRemoteRtpDataChannel(label, params.first_ssrc());
+    } else {
+      data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
+    }
+    existing_channels.push_back(label);
+  }
+
+  UpdateClosingRtpDataChannels(existing_channels, false);
+}
+
+void DataChannelController::UpdateClosingRtpDataChannels(
+    const std::vector<std::string>& active_channels,
+    bool is_local_update) {
+  auto it = rtp_data_channels_.begin();
+  while (it != rtp_data_channels_.end()) {
+    DataChannel* data_channel = it->second;
+    if (absl::c_linear_search(active_channels, data_channel->label())) {
+      ++it;
+      continue;
+    }
+
+    if (is_local_update) {
+      data_channel->SetSendSsrc(0);
+    } else {
+      data_channel->RemotePeerRequestClose();
+    }
+
+    if (data_channel->state() == DataChannel::kClosed) {
+      rtp_data_channels_.erase(it);
+      it = rtp_data_channels_.begin();
+    } else {
+      ++it;
+    }
+  }
+}
+
+void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
+                                                       uint32_t remote_ssrc) {
+  rtc::scoped_refptr<DataChannel> channel(
+      InternalCreateDataChannel(label, nullptr));
+  if (!channel.get()) {
+    RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
+                           "CreateDataChannel failed.";
+    return;
+  }
+  channel->SetReceiveSsrc(remote_ssrc);
+  rtc::scoped_refptr<DataChannelInterface> proxy_channel =
+      DataChannelProxy::Create(signaling_thread(), channel);
+  pc_->Observer()->OnDataChannel(std::move(proxy_channel));
+}
+
+rtc::Thread* DataChannelController::network_thread() const {
+  return pc_->network_thread();
+}
+rtc::Thread* DataChannelController::signaling_thread() const {
+  return pc_->signaling_thread();
+}
+
 }  // namespace webrtc