Add RtcpTransceiver::Stop to allow non-blocking destruction

As downside it disallows to destroy RtcpTransceiver on the TaskQueue
without prio call to the Stop function

BUG: webrtc:8239
Change-Id: I236b9aff7a0746044dd764c619174cc5ac03d27f
Reviewed-on: https://webrtc-review.googlesource.com/98120
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24587}
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver.cc b/modules/rtp_rtcp/source/rtcp_transceiver.cc
index 9a17929..e5ad3ba 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver.cc
+++ b/modules/rtp_rtcp/source/rtcp_transceiver.cc
@@ -22,42 +22,48 @@
 
 RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config)
     : task_queue_(config.task_queue),
-      rtcp_transceiver_(absl::make_unique<RtcpTransceiverImpl>(config)),
-      ptr_factory_(rtcp_transceiver_.get()),
-      // Creating first weak ptr can be done on any thread, but is not
-      // thread-safe, thus do it at construction. Creating second (e.g. making a
-      // copy) is thread-safe.
-      ptr_(ptr_factory_.GetWeakPtr()) {
+      rtcp_transceiver_(absl::make_unique<RtcpTransceiverImpl>(config)) {
   RTC_DCHECK(task_queue_);
 }
 
 RtcpTransceiver::~RtcpTransceiver() {
-  if (task_queue_->IsCurrent())
+  if (!rtcp_transceiver_)
     return;
+  RTC_CHECK(!task_queue_->IsCurrent());
 
   rtc::Event done(false, false);
   // TODO(danilchap): Merge cleanup into main closure when task queue does not
   // silently drop tasks.
   task_queue_->PostTask(rtc::NewClosure(
       [this] {
-        // Destructor steps that has to run on the task_queue_.
-        ptr_factory_.InvalidateWeakPtrs();
+        // Destructor steps that better run on the task_queue_.
         rtcp_transceiver_.reset();
       },
       /*cleanup=*/[&done] { done.Set(); }));
-  // Wait until destruction is complete to be sure weak pointers invalidated and
-  // rtcp_transceiver destroyed on the queue while |this| still valid.
+  // Wait until destruction is complete to guarantee callbacks are not used
+  // after destructor returns.
   done.Wait(rtc::Event::kForever);
   RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp";
 }
 
+void RtcpTransceiver::Stop(std::unique_ptr<rtc::QueuedTask> on_destroyed) {
+  RTC_DCHECK(rtcp_transceiver_);
+  struct Destructor {
+    void operator()() { rtcp_transceiver = nullptr; }
+    std::unique_ptr<RtcpTransceiverImpl> rtcp_transceiver;
+  };
+  task_queue_->PostTaskAndReply(Destructor{std::move(rtcp_transceiver_)},
+                                std::move(on_destroyed));
+  RTC_DCHECK(!rtcp_transceiver_);
+}
+
 void RtcpTransceiver::AddMediaReceiverRtcpObserver(
     uint32_t remote_ssrc,
     MediaReceiverRtcpObserver* observer) {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   task_queue_->PostTask([ptr, remote_ssrc, observer] {
-    if (ptr)
-      ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer);
+    ptr->AddMediaReceiverRtcpObserver(remote_ssrc, observer);
   });
 }
 
@@ -65,59 +71,60 @@
     uint32_t remote_ssrc,
     MediaReceiverRtcpObserver* observer,
     std::unique_ptr<rtc::QueuedTask> on_removed) {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   auto remove = [ptr, remote_ssrc, observer] {
-    if (ptr)
-      ptr->RemoveMediaReceiverRtcpObserver(remote_ssrc, observer);
+    ptr->RemoveMediaReceiverRtcpObserver(remote_ssrc, observer);
   };
   task_queue_->PostTaskAndReply(std::move(remove), std::move(on_removed));
 }
 
 void RtcpTransceiver::SetReadyToSend(bool ready) {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   task_queue_->PostTask([ptr, ready] {
-    if (ptr)
       ptr->SetReadyToSend(ready);
   });
 }
 
 void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   int64_t now_us = rtc::TimeMicros();
   task_queue_->PostTask([ptr, packet, now_us] {
-    if (ptr)
       ptr->ReceivePacket(packet, now_us);
   });
 }
 
 void RtcpTransceiver::SendCompoundPacket() {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   task_queue_->PostTask([ptr] {
-    if (ptr)
       ptr->SendCompoundPacket();
   });
 }
 
 void RtcpTransceiver::SetRemb(int64_t bitrate_bps,
                               std::vector<uint32_t> ssrcs) {
+  RTC_CHECK(rtcp_transceiver_);
   // TODO(danilchap): Replace with lambda with move capture when available.
   struct SetRembClosure {
     void operator()() {
-      if (ptr)
         ptr->SetRemb(bitrate_bps, std::move(ssrcs));
     }
 
-    rtc::WeakPtr<RtcpTransceiverImpl> ptr;
+    RtcpTransceiverImpl* ptr;
     int64_t bitrate_bps;
     std::vector<uint32_t> ssrcs;
   };
-  task_queue_->PostTask(SetRembClosure{ptr_, bitrate_bps, std::move(ssrcs)});
+  task_queue_->PostTask(
+      SetRembClosure{rtcp_transceiver_.get(), bitrate_bps, std::move(ssrcs)});
 }
 
 void RtcpTransceiver::UnsetRemb() {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
   task_queue_->PostTask([ptr] {
-    if (ptr)
       ptr->UnsetRemb();
   });
 }
@@ -128,54 +135,53 @@
 
 bool RtcpTransceiver::SendFeedbackPacket(
     const rtcp::TransportFeedback& packet) {
+  RTC_CHECK(rtcp_transceiver_);
   struct Closure {
     void operator()() {
-      if (ptr)
         ptr->SendRawPacket(raw_packet);
     }
-    rtc::WeakPtr<RtcpTransceiverImpl> ptr;
+    RtcpTransceiverImpl* ptr;
     rtc::Buffer raw_packet;
   };
-  task_queue_->PostTask(Closure{ptr_, packet.Build()});
+  task_queue_->PostTask(Closure{rtcp_transceiver_.get(), packet.Build()});
   return true;
 }
 
 void RtcpTransceiver::SendNack(uint32_t ssrc,
                                std::vector<uint16_t> sequence_numbers) {
+  RTC_CHECK(rtcp_transceiver_);
   // TODO(danilchap): Replace with lambda with move capture when available.
   struct Closure {
     void operator()() {
-      if (ptr)
         ptr->SendNack(ssrc, std::move(sequence_numbers));
     }
 
-    rtc::WeakPtr<RtcpTransceiverImpl> ptr;
+    RtcpTransceiverImpl* ptr;
     uint32_t ssrc;
     std::vector<uint16_t> sequence_numbers;
   };
-  task_queue_->PostTask(Closure{ptr_, ssrc, std::move(sequence_numbers)});
+  task_queue_->PostTask(
+      Closure{rtcp_transceiver_.get(), ssrc, std::move(sequence_numbers)});
 }
 
 void RtcpTransceiver::SendPictureLossIndication(uint32_t ssrc) {
-  rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
-  task_queue_->PostTask([ptr, ssrc] {
-    if (ptr)
-      ptr->SendPictureLossIndication(ssrc);
-  });
+  RTC_CHECK(rtcp_transceiver_);
+  RtcpTransceiverImpl* ptr = rtcp_transceiver_.get();
+  task_queue_->PostTask([ptr, ssrc] { ptr->SendPictureLossIndication(ssrc); });
 }
 
 void RtcpTransceiver::SendFullIntraRequest(std::vector<uint32_t> ssrcs) {
+  RTC_CHECK(rtcp_transceiver_);
   // TODO(danilchap): Replace with lambda with move capture when available.
   struct Closure {
     void operator()() {
-      if (ptr)
         ptr->SendFullIntraRequest(ssrcs);
     }
 
-    rtc::WeakPtr<RtcpTransceiverImpl> ptr;
+    RtcpTransceiverImpl* ptr;
     std::vector<uint32_t> ssrcs;
   };
-  task_queue_->PostTask(Closure{ptr_, std::move(ssrcs)});
+  task_queue_->PostTask(Closure{rtcp_transceiver_.get(), std::move(ssrcs)});
 }
 
 }  // namespace webrtc