Allow RTP module thread checking to know PacketRouter status.

Since https://webrtc-review.googlesource.com/c/src/+/228433 both audio
and video now only call Get/SetRtpState while not registered to the
packet router.

We can thus remove the lock around packet sequencer and just use a
thread checker.

Bug: webrtc:11340
Change-Id: Ie6865cc96c36208700c31a75747ff4dd992ce68d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228435
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34755}
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 7f0e9da..f95c743 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -451,6 +451,15 @@
   // Signal congestion controller this object is ready for OnPacket* callbacks.
   transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
       rtp_config_.ssrcs, this);
+
+  // Construction happens on the worker thread (see Call::CreateVideoSendStream)
+  // but subseqeuent calls to the RTP state will happen on one of two threads:
+  // * The pacer thread for actually sending packets.
+  // * The transport thread when tearing down and quering GetRtpState().
+  // Detach thread checkers.
+  for (const RtpStreamSender& stream : rtp_streams_) {
+    stream.rtp_rtcp->OnPacketSendingThreadSwitched();
+  }
 }
 
 RtpVideoSender::~RtpVideoSender() {
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index 3b1278e..fcc7ee3 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -68,6 +68,10 @@
 void PacketRouter::AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module,
                                          uint32_t ssrc) {
   RTC_DCHECK(send_modules_map_.find(ssrc) == send_modules_map_.end());
+
+  // Signal to module that the pacer thread is attached and can send packets.
+  rtp_module->OnPacketSendingThreadSwitched();
+
   // Always keep the audio modules at the back of the list, so that when we
   // iterate over the modules in order to find one that can send padding we
   // will prioritize video. This is important to make sure they are counted
@@ -102,6 +106,7 @@
   if (last_send_module_ == rtp_module) {
     last_send_module_ = nullptr;
   }
+  rtp_module->OnPacketSendingThreadSwitched();
 }
 
 void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 59b8cf1..33c5a9b 100644
--- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -115,6 +115,7 @@
               (rtc::ArrayView<const uint16_t> sequence_numbers),
               (const, override));
   MOCK_METHOD(size_t, ExpectedPerPacketOverhead, (), (const, override));
+  MOCK_METHOD(void, OnPacketSendingThreadSwitched, (), (override));
   MOCK_METHOD(RtcpMode, RTCP, (), (const, override));
   MOCK_METHOD(void, SetRTCPStatus, (RtcpMode method), (override));
   MOCK_METHOD(int32_t,
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
index f6f9afb..34e8429 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.cc
@@ -490,6 +490,8 @@
   return rtp_sender_->packet_generator.ExpectedPerPacketOverhead();
 }
 
+void ModuleRtpRtcpImpl::OnPacketSendingThreadSwitched() {}
+
 size_t ModuleRtpRtcpImpl::MaxRtpPacketSize() const {
   RTC_DCHECK(rtp_sender_);
   return rtp_sender_->packet_generator.MaxRtpPacketSize();
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index 81b1170..2ffe013 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -157,6 +157,8 @@
 
   size_t ExpectedPerPacketOverhead() const override;
 
+  void OnPacketSendingThreadSwitched() override;
+
   // RTCP part.
 
   // Get RTCP status.
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
index ec42f61..7d33ca0 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
@@ -64,22 +64,22 @@
     const RtpRtcpInterface::Configuration& config)
     : packet_history(config.clock, config.enable_rtx_padding_prioritization),
       deferred_sequencing_(config.use_deferred_sequencing),
-      sequencer_(config.local_media_ssrc,
-                 config.rtx_send_ssrc,
-                 /*require_marker_before_media_padding=*/!config.audio,
-                 config.clock),
+      sequencer(config.local_media_ssrc,
+                config.rtx_send_ssrc,
+                /*require_marker_before_media_padding=*/!config.audio,
+                config.clock),
       packet_sender(config, &packet_history),
       non_paced_sender(&packet_sender, this, config.use_deferred_sequencing),
       packet_generator(
           config,
           &packet_history,
           config.paced_sender ? config.paced_sender : &non_paced_sender,
-          config.use_deferred_sequencing ? nullptr : &sequencer_) {}
+          config.use_deferred_sequencing ? nullptr : &sequencer) {}
 void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber(
     RtpPacketToSend* packet) {
+  RTC_DCHECK_RUN_ON(&sequencing_checker);
   if (deferred_sequencing_) {
-    MutexLock lock(&mutex_sequencer_);
-    sequencer_.Sequence(*packet);
+    sequencer.Sequence(*packet);
   } else {
     packet_generator.AssignSequenceNumber(packet);
   }
@@ -101,10 +101,10 @@
       rtt_stats_(configuration.rtt_stats),
       rtt_ms_(0) {
   RTC_DCHECK(worker_queue_);
-  packet_sequence_checker_.Detach();
-  pacer_thread_checker_.Detach();
+  rtcp_thread_checker_.Detach();
   if (!configuration.receiver_only) {
     rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
+    rtp_sender_->sequencing_checker.Detach();
     // Make sure rtcp sender use same timestamp offset as rtp sender.
     rtcp_sender_.SetTimestampOffset(
         rtp_sender_->packet_generator.TimestampOffset());
@@ -162,7 +162,7 @@
 
 void ModuleRtpRtcpImpl2::IncomingRtcpPacket(const uint8_t* rtcp_packet,
                                             const size_t length) {
-  RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+  RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
   rtcp_receiver_.IncomingPacket(rtcp_packet, length);
 }
 
@@ -187,20 +187,19 @@
 }
 
 uint16_t ModuleRtpRtcpImpl2::SequenceNumber() const {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   if (rtp_sender_->deferred_sequencing_) {
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    return rtp_sender_->sequencer_.media_sequence_number();
+    return rtp_sender_->sequencer.media_sequence_number();
   }
   return rtp_sender_->packet_generator.SequenceNumber();
 }
 
 // Set SequenceNumber, default is a random number.
 void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   if (rtp_sender_->deferred_sequencing_) {
-    RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    if (rtp_sender_->sequencer_.media_sequence_number() != seq_num) {
-      rtp_sender_->sequencer_.set_media_sequence_number(seq_num);
+    if (rtp_sender_->sequencer.media_sequence_number() != seq_num) {
+      rtp_sender_->sequencer.set_media_sequence_number(seq_num);
       rtp_sender_->packet_history.Clear();
     }
   } else {
@@ -209,36 +208,36 @@
 }
 
 void ModuleRtpRtcpImpl2::SetRtpState(const RtpState& rtp_state) {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   rtp_sender_->packet_generator.SetRtpState(rtp_state);
   if (rtp_sender_->deferred_sequencing_) {
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    rtp_sender_->sequencer_.SetRtpState(rtp_state);
+    rtp_sender_->sequencer.SetRtpState(rtp_state);
   }
   rtcp_sender_.SetTimestampOffset(rtp_state.start_timestamp);
 }
 
 void ModuleRtpRtcpImpl2::SetRtxState(const RtpState& rtp_state) {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   rtp_sender_->packet_generator.SetRtxRtpState(rtp_state);
   if (rtp_sender_->deferred_sequencing_) {
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    rtp_sender_->sequencer_.set_rtx_sequence_number(rtp_state.sequence_number);
+    rtp_sender_->sequencer.set_rtx_sequence_number(rtp_state.sequence_number);
   }
 }
 
 RtpState ModuleRtpRtcpImpl2::GetRtpState() const {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   RtpState state = rtp_sender_->packet_generator.GetRtpState();
   if (rtp_sender_->deferred_sequencing_) {
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    rtp_sender_->sequencer_.PopulateRtpState(state);
+    rtp_sender_->sequencer.PopulateRtpState(state);
   }
   return state;
 }
 
 RtpState ModuleRtpRtcpImpl2::GetRtxState() const {
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   RtpState state = rtp_sender_->packet_generator.GetRtxRtpState();
   if (rtp_sender_->deferred_sequencing_) {
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
-    state.sequence_number = rtp_sender_->sequencer_.rtx_sequence_number();
+    state.sequence_number = rtp_sender_->sequencer.rtx_sequence_number();
   }
   return state;
 }
@@ -249,7 +248,7 @@
 }
 
 uint32_t ModuleRtpRtcpImpl2::local_media_ssrc() const {
-  RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+  RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
   RTC_DCHECK_EQ(rtcp_receiver_.local_media_ssrc(), rtcp_sender_.SSRC());
   return rtcp_receiver_.local_media_ssrc();
 }
@@ -335,12 +334,6 @@
 // updated.
 void ModuleRtpRtcpImpl2::SetSendingMediaStatus(const bool sending) {
   if (rtp_sender_) {
-    // Turning on or off sending status indicates module being set
-    // up or torn down, detach thread checker since subsequent calls
-    // may be from a different thread.
-    if (rtp_sender_->packet_generator.SendingMedia() != sending) {
-      pacer_thread_checker_.Detach();
-    }
     rtp_sender_->packet_generator.SetSendingMediaStatus(sending);
   } else {
     RTC_DCHECK(!sending);
@@ -389,15 +382,14 @@
 bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
                                        const PacedPacketInfo& pacing_info) {
   RTC_DCHECK(rtp_sender_);
-  RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   if (rtp_sender_->deferred_sequencing_) {
     if (!rtp_sender_->packet_generator.SendingMedia()) {
       return false;
     }
-    MutexLock lock(&rtp_sender_->mutex_sequencer_);
     if (packet->packet_type() == RtpPacketMediaType::kPadding &&
         packet->Ssrc() == rtp_sender_->packet_generator.SSRC() &&
-        !rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()) {
+        !rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()) {
       // New media packet preempted this generated padding packet, discard it.
       return false;
     }
@@ -405,7 +397,7 @@
         packet->packet_type() == RtpPacketMediaType::kForwardErrorCorrection &&
         packet->Ssrc() == rtp_sender_->packet_generator.FlexfecSsrc();
     if (!is_flexfec) {
-      rtp_sender_->sequencer_.Sequence(*packet);
+      rtp_sender_->sequencer.Sequence(*packet);
     }
   } else if (!rtp_sender_->packet_generator.SendingMedia()) {
     return false;
@@ -425,7 +417,7 @@
 std::vector<std::unique_ptr<RtpPacketToSend>>
 ModuleRtpRtcpImpl2::FetchFecPackets() {
   RTC_DCHECK(rtp_sender_);
-  RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
   auto fec_packets = rtp_sender_->packet_sender.FetchFecPackets();
   if (!fec_packets.empty() && !rtp_sender_->deferred_sequencing_) {
     // Only assign sequence numbers for FEC packets in non-deferred mode, and
@@ -460,17 +452,15 @@
 std::vector<std::unique_ptr<RtpPacketToSend>>
 ModuleRtpRtcpImpl2::GeneratePadding(size_t target_size_bytes) {
   RTC_DCHECK(rtp_sender_);
-  RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
-  MutexLock lock(&rtp_sender_->mutex_sequencer_);
+  RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
 
   // `can_send_padding_on_media_ssrc` set to false when deferred sequencing
   // is off. It will be ignored in that case, RTPSender will internally query
-  // `sequencer_` while holding the send lock instead.
+  // `sequencer` while holding the send lock instead.
   return rtp_sender_->packet_generator.GeneratePadding(
       target_size_bytes, rtp_sender_->packet_sender.MediaHasBeenSent(),
-
       rtp_sender_->deferred_sequencing_
-          ? rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()
+          ? rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()
           : false);
 }
 
@@ -488,6 +478,11 @@
   return rtp_sender_->packet_generator.ExpectedPerPacketOverhead();
 }
 
+void ModuleRtpRtcpImpl2::OnPacketSendingThreadSwitched() {
+  // Ownership of sequencing is being transferred to another thread.
+  rtp_sender_->sequencing_checker.Detach();
+}
+
 size_t ModuleRtpRtcpImpl2::MaxRtpPacketSize() const {
   RTC_DCHECK(rtp_sender_);
   return rtp_sender_->packet_generator.MaxRtpPacketSize();
@@ -730,7 +725,7 @@
 }
 
 void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) {
-  RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
+  RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
   rtcp_receiver_.set_local_media_ssrc(local_ssrc);
   rtcp_sender_.SetSsrc(local_ssrc);
 }
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
index b468bb7..f01c0c0 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
@@ -169,6 +169,8 @@
 
   size_t ExpectedPerPacketOverhead() const override;
 
+  void OnPacketSendingThreadSwitched() override;
+
   // RTCP part.
 
   // Get RTCP status.
@@ -269,12 +271,9 @@
     // If false, sequencing is owned by `packet_generator` and can happen on
     // several threads. If true, sequencing always happens on the pacer thread.
     const bool deferred_sequencing_;
-    // TODO(bugs.webrtc.org/11340): Remove lock one we can guarantee that
-    // setting/getting rtp state only happens after removal from packet sending
-    // code path.
-    mutable Mutex mutex_sequencer_;
+    SequenceChecker sequencing_checker;
     // Handles sequence number assignment and padding timestamp generation.
-    PacketSequencer sequencer_ RTC_GUARDED_BY(mutex_sequencer_);
+    PacketSequencer sequencer RTC_GUARDED_BY(sequencing_checker);
     // Handles final time timestamping/stats/etc and handover to Transport.
     RtpSenderEgress packet_sender;
     // If no paced sender configured, this class will be used to pass packets
@@ -314,8 +313,7 @@
                                                TimeDelta duration);
 
   TaskQueueBase* const worker_queue_;
-  RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_;
-  RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_thread_checker_;
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker rtcp_thread_checker_;
 
   std::unique_ptr<RtpSenderContext> rtp_sender_;
   RTCPSender rtcp_sender_;
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
index e90d866..c1a44fa 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
@@ -347,6 +347,15 @@
   // when we expect to send them).
   virtual size_t ExpectedPerPacketOverhead() const = 0;
 
+  // Access to packet state (e.g. sequence numbering) must only be access by
+  // one thread at a time. It may be only one thread, or a construction thread
+  // that calls SetRtpState() - handing over to a pacer thread that calls
+  // TrySendPacket() - and at teardown ownership is handed to a destruciton
+  // thread that calls GetRtpState().
+  // This method is used to signal that "ownership" of the rtp state is being
+  // transferred to another thread.
+  virtual void OnPacketSendingThreadSwitched() = 0;
+
   // **************************************************************************
   // RTCP
   // **************************************************************************