Revert "Revert of Parse FlexFEC RTP headers in Call and add integration with BWE. (patchset #17 id:460001 of https://codereview.webrtc.org/2553863003/ )"

Problem fixed: RTP header extensions were not properly set in tests.

BUG=webrtc:5654

Review-Url: https://codereview.webrtc.org/2593963003
Cr-Commit-Position: refs/heads/master@{#15741}
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index 1c4b7b2..c7998b2 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -24,6 +24,7 @@
 #include "webrtc/base/checks.h"
 #include "webrtc/base/constructormagic.h"
 #include "webrtc/base/logging.h"
+#include "webrtc/base/optional.h"
 #include "webrtc/base/task_queue.h"
 #include "webrtc/base/thread_annotations.h"
 #include "webrtc/base/thread_checker.h"
@@ -39,6 +40,8 @@
 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
 #include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
 #include "webrtc/modules/rtp_rtcp/source/byte_io.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_header_extension.h"
+#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
 #include "webrtc/modules/utility/include/process_thread.h"
 #include "webrtc/system_wrappers/include/clock.h"
 #include "webrtc/system_wrappers/include/cpu_info.h"
@@ -107,6 +110,8 @@
   // Implements RecoveredPacketReceiver.
   bool OnRecoveredPacket(const uint8_t* packet, size_t length) override;
 
+  void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet);
+
   void SetBitrateConfig(
       const webrtc::Call::Config::BitrateConfig& bitrate_config) override;
 
@@ -154,6 +159,11 @@
       return nullptr;
   }
 
+  rtc::Optional<RtpPacketReceived> ParseRtpPacket(const uint8_t* packet,
+                                                  size_t length,
+                                                  const PacketTime& packet_time)
+      SHARED_LOCKS_REQUIRED(receive_crit_);
+
   void UpdateSendHistograms() EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_);
   void UpdateReceiveHistograms();
   void UpdateHistograms();
@@ -192,6 +202,14 @@
   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
       GUARDED_BY(receive_crit_);
 
+  // Registered RTP header extensions for each stream.
+  // Note that RTP header extensions are negotiated per track ("m= line") in the
+  // SDP, but we have no notion of tracks at the Call level. We therefore store
+  // the RTP header extensions per SSRC instead, which leads to some storage
+  // overhead.
+  std::map<uint32_t, RtpHeaderExtensionMap> received_rtp_header_extensions_
+      GUARDED_BY(receive_crit_);
+
   std::unique_ptr<RWLockWrapper> send_crit_;
   // Audio and Video send streams are owned by the client that creates them.
   std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ GUARDED_BY(send_crit_);
@@ -345,6 +363,29 @@
   Trace::ReturnTrace();
 }
 
+rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
+    const uint8_t* packet,
+    size_t length,
+    const PacketTime& packet_time) {
+  RtpPacketReceived parsed_packet;
+  if (!parsed_packet.Parse(packet, length))
+    return rtc::Optional<RtpPacketReceived>();
+
+  auto it = received_rtp_header_extensions_.find(parsed_packet.Ssrc());
+  if (it != received_rtp_header_extensions_.end())
+    parsed_packet.IdentifyExtensions(it->second);
+
+  int64_t arrival_time_ms;
+  if (packet_time.timestamp != -1) {
+    arrival_time_ms = (packet_time.timestamp + 500) / 1000;
+  } else {
+    arrival_time_ms = clock_->TimeInMilliseconds();
+  }
+  parsed_packet.set_arrival_time_ms(arrival_time_ms);
+
+  return rtc::Optional<RtpPacketReceived>(std::move(parsed_packet));
+}
+
 void Call::UpdateHistograms() {
   RTC_HISTOGRAM_COUNTS_100000(
       "WebRTC.Call.LifetimeInSeconds",
@@ -659,25 +700,40 @@
     const FlexfecReceiveStream::Config& config) {
   TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
   RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+
+  RecoveredPacketReceiver* recovered_packet_receiver = this;
   FlexfecReceiveStreamImpl* receive_stream =
-      new FlexfecReceiveStreamImpl(config, this);
+      new FlexfecReceiveStreamImpl(config, recovered_packet_receiver);
 
   {
     WriteLockScoped write_lock(*receive_crit_);
+
+    RTC_DCHECK(flexfec_receive_streams_.find(receive_stream) ==
+               flexfec_receive_streams_.end());
+    flexfec_receive_streams_.insert(receive_stream);
+
     for (auto ssrc : config.protected_media_ssrcs)
       flexfec_receive_ssrcs_media_.insert(std::make_pair(ssrc, receive_stream));
+
     RTC_DCHECK(flexfec_receive_ssrcs_protection_.find(config.remote_ssrc) ==
                flexfec_receive_ssrcs_protection_.end());
     flexfec_receive_ssrcs_protection_[config.remote_ssrc] = receive_stream;
-    flexfec_receive_streams_.insert(receive_stream);
+
+    RTC_DCHECK(received_rtp_header_extensions_.find(config.remote_ssrc) ==
+               received_rtp_header_extensions_.end());
+    RtpHeaderExtensionMap rtp_header_extensions(config.rtp_header_extensions);
+    received_rtp_header_extensions_[config.remote_ssrc] = rtp_header_extensions;
   }
+
   // TODO(brandtr): Store config in RtcEventLog here.
+
   return receive_stream;
 }
 
 void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
   TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
   RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread());
+
   RTC_DCHECK(receive_stream != nullptr);
   // There exist no other derived classes of FlexfecReceiveStream,
   // so this downcast is safe.
@@ -685,15 +741,12 @@
       static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
   {
     WriteLockScoped write_lock(*receive_crit_);
+
+    uint32_t ssrc = receive_stream_impl->GetConfig().remote_ssrc;
+    received_rtp_header_extensions_.erase(ssrc);
+
     // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
     // destroyed.
-    auto media_it = flexfec_receive_ssrcs_media_.begin();
-    while (media_it != flexfec_receive_ssrcs_media_.end()) {
-      if (media_it->second == receive_stream_impl)
-        media_it = flexfec_receive_ssrcs_media_.erase(media_it);
-      else
-        ++media_it;
-    }
     auto prot_it = flexfec_receive_ssrcs_protection_.begin();
     while (prot_it != flexfec_receive_ssrcs_protection_.end()) {
       if (prot_it->second == receive_stream_impl)
@@ -701,8 +754,17 @@
       else
         ++prot_it;
     }
+    auto media_it = flexfec_receive_ssrcs_media_.begin();
+    while (media_it != flexfec_receive_ssrcs_media_.end()) {
+      if (media_it->second == receive_stream_impl)
+        media_it = flexfec_receive_ssrcs_media_.erase(media_it);
+      else
+        ++media_it;
+    }
+
     flexfec_receive_streams_.erase(receive_stream_impl);
   }
+
   delete receive_stream_impl;
 }
 
@@ -1076,13 +1138,21 @@
     if (it != video_receive_ssrcs_.end()) {
       received_bytes_per_second_counter_.Add(static_cast<int>(length));
       received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
+      // TODO(brandtr): Notify the BWE of received media packets here.
       auto status = it->second->DeliverRtp(packet, length, packet_time)
                         ? DELIVERY_OK
                         : DELIVERY_PACKET_ERROR;
-      // Deliver media packets to FlexFEC subsystem.
-      auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
-      for (auto it = it_bounds.first; it != it_bounds.second; ++it)
-        it->second->AddAndProcessReceivedPacket(packet, length);
+      // Deliver media packets to FlexFEC subsystem. RTP header extensions need
+      // not be parsed, as FlexFEC is oblivious to the semantic meaning of the
+      // packet contents beyond the 12 byte RTP base header. The BWE is fed
+      // information about these media packets from the regular media pipeline.
+      rtc::Optional<RtpPacketReceived> parsed_packet =
+          ParseRtpPacket(packet, length, packet_time);
+      if (parsed_packet) {
+        auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
+        for (auto it = it_bounds.first; it != it_bounds.second; ++it)
+          it->second->AddAndProcessReceivedPacket(*parsed_packet);
+      }
       if (status == DELIVERY_OK)
         event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
       return status;
@@ -1091,12 +1161,18 @@
   if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
     auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
     if (it != flexfec_receive_ssrcs_protection_.end()) {
-      auto status = it->second->AddAndProcessReceivedPacket(packet, length)
-                        ? DELIVERY_OK
-                        : DELIVERY_PACKET_ERROR;
-      if (status == DELIVERY_OK)
-        event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
-      return status;
+      rtc::Optional<RtpPacketReceived> parsed_packet =
+          ParseRtpPacket(packet, length, packet_time);
+      if (parsed_packet) {
+        NotifyBweOfReceivedPacket(*parsed_packet);
+        auto status =
+            it->second->AddAndProcessReceivedPacket(std::move(*parsed_packet))
+                ? DELIVERY_OK
+                : DELIVERY_PACKET_ERROR;
+        if (status == DELIVERY_OK)
+          event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
+        return status;
+      }
     }
   }
   return DELIVERY_UNKNOWN_SSRC;
@@ -1128,5 +1204,12 @@
   return it->second->OnRecoveredPacket(packet, length);
 }
 
+void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet) {
+  RTPHeader header;
+  packet.GetHeader(&header);
+  congestion_controller_->OnReceivedPacket(packet.arrival_time_ms(),
+                                           packet.payload_size(), header);
+}
+
 }  // namespace internal
 }  // namespace webrtc