Migrate call/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Ifcdcd343fcba1d850e40813bc08862c42647b0c5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268002
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37477}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index b72afed..e03f15d 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -526,6 +526,7 @@
       ]
       absl_deps = [
         "//third_party/abseil-cpp/absl/container:inlined_vector",
+        "//third_party/abseil-cpp/absl/functional:any_invocable",
         "//third_party/abseil-cpp/absl/memory",
         "//third_party/abseil-cpp/absl/strings",
         "//third_party/abseil-cpp/absl/types:optional",
diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc
index 1f9b662..54a683b 100644
--- a/call/adaptation/resource_adaptation_processor.cc
+++ b/call/adaptation/resource_adaptation_processor.cc
@@ -41,11 +41,11 @@
     OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
                                  ResourceUsageState usage_state) {
   if (!task_queue_->IsCurrent()) {
-    task_queue_->PostTask(ToQueuedTask(
+    task_queue_->PostTask(
         [this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
          resource, usage_state] {
           this_ref->OnResourceUsageStateMeasured(resource, usage_state);
-        }));
+        });
     return;
   }
   RTC_DCHECK_RUN_ON(task_queue_);
@@ -142,8 +142,8 @@
 void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
     rtc::scoped_refptr<Resource> resource) {
   if (!task_queue_->IsCurrent()) {
-    task_queue_->PostTask(ToQueuedTask(
-        [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
+    task_queue_->PostTask(
+        [this, resource]() { RemoveLimitationsImposedByResource(resource); });
     return;
   }
   RTC_DCHECK_RUN_ON(task_queue_);
diff --git a/call/adaptation/resource_adaptation_processor_unittest.cc b/call/adaptation/resource_adaptation_processor_unittest.cc
index 97c01b3..7f09e22 100644
--- a/call/adaptation/resource_adaptation_processor_unittest.cc
+++ b/call/adaptation/resource_adaptation_processor_unittest.cc
@@ -430,8 +430,8 @@
   SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize);
 
   TaskQueueForTest resource_task_queue("ResourceTaskQueue");
-  resource_task_queue.PostTask(ToQueuedTask(
-      [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); }));
+  resource_task_queue.PostTask(
+      [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); });
 
   EXPECT_EQ_WAIT(1u, restrictions_listener_.restrictions_updated_count(),
                  kDefaultTimeoutMs);
@@ -447,10 +447,10 @@
   // has passed it on to the processor's task queue.
   rtc::Event resource_event;
   TaskQueueForTest resource_task_queue("ResourceTaskQueue");
-  resource_task_queue.PostTask(ToQueuedTask([&]() {
+  resource_task_queue.PostTask([&]() {
     resource_->SetUsageState(ResourceUsageState::kOveruse);
     resource_event.Set();
-  }));
+  });
 
   EXPECT_TRUE(resource_event.Wait(kDefaultTimeoutMs));
   // Now destroy the processor while handling the overuse is in flight.
@@ -470,10 +470,10 @@
   rtc::Event overuse_event;
   TaskQueueForTest resource_task_queue("ResourceTaskQueue");
   // Queues task for `resource_` overuse while `processor_` is still listening.
-  resource_task_queue.PostTask(ToQueuedTask([&]() {
+  resource_task_queue.PostTask([&]() {
     resource_->SetUsageState(ResourceUsageState::kOveruse);
     overuse_event.Set();
-  }));
+  });
   EXPECT_TRUE(overuse_event.Wait(kDefaultTimeoutMs));
   // Once we know the overuse task is queued, remove `resource_` so that
   // `processor_` is not listening to it.
diff --git a/call/call.cc b/call/call.cc
index ad08b8a..9be0953 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -1209,14 +1209,14 @@
   } else {
     // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
     // post to the worker thread.
-    worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
+    worker_thread_->PostTask(SafeTask(task_safety_.flag(), std::move(closure)));
   }
 }
 
 void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
   RTC_DCHECK_RUN_ON(network_thread_);
   worker_thread_->PostTask(
-      ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
+      SafeTask(task_safety_.flag(), [this, transport_overhead_per_packet]() {
         // TODO(bugs.webrtc.org/11993): Move this over to the network thread.
         RTC_DCHECK_RUN_ON(worker_thread_);
         for (auto& kv : audio_send_ssrcs_) {
@@ -1408,7 +1408,7 @@
   // TODO(bugs.webrtc.org/11993): This should execute directly on the network
   // thread.
   worker_thread_->PostTask(
-      ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() {
+      SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
         RTC_DCHECK_RUN_ON(worker_thread_);
 
         receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc
index 5be6385..54892f0 100644
--- a/call/call_perf_tests.cc
+++ b/call/call_perf_tests.cc
@@ -113,7 +113,7 @@
         task_queue_(task_queue) {}
 
   void OnFrame(const VideoFrame& video_frame) override {
-    task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
+    task_queue_->PostTask([this]() { CheckStats(); });
   }
 
   void CheckStats() {
@@ -343,7 +343,7 @@
   }
 
   task_queue()->PostTask(
-      ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
+      [to_delete = observer.release()]() { delete to_delete; });
 }
 
 TEST_F(CallPerfTest, Synchronization_PlaysOutAudioAndVideoWithoutClockDrift) {
@@ -680,7 +680,7 @@
    private:
     // TODO(holmer): Run this with a timer instead of once per packet.
     Action OnSendRtp(const uint8_t* packet, size_t length) override {
-      task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+      task_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
         VideoSendStream::Stats stats = send_stream_->GetStats();
 
         if (!stats.substreams.empty()) {
@@ -1146,7 +1146,7 @@
       const Timestamp now = clock_->CurrentTime();
       if (now - last_getstats_time_ > kMinGetStatsInterval) {
         last_getstats_time_ = now;
-        task_queue_->PostTask(ToQueuedTask([this, now]() {
+        task_queue_->PostTask([this, now]() {
           VideoSendStream::Stats stats = send_stream_->GetStats();
           for (const auto& stat : stats.substreams) {
             encode_frame_rate_lists_[stat.first].push_back(
@@ -1156,7 +1156,7 @@
             VerifyStats();
             observation_complete_.Set();
           }
-        }));
+        });
       }
       return SEND_PACKET;
     }
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index 2c01c99..bc3f587 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -62,20 +62,20 @@
     return false;
   }
 
-  task_queue_->PostTask(ToQueuedTask(task_safety_, [this, time_to_next] {
+  task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] {
     RTC_DCHECK_RUN_ON(task_queue_);
     int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
     if (!next_process_ms_ || next_process_time < *next_process_ms_) {
       next_process_ms_ = next_process_time;
       task_queue_->PostDelayedHighPrecisionTask(
-          ToQueuedTask(task_safety_,
-                       [this] {
-                         RTC_DCHECK_RUN_ON(task_queue_);
-                         if (!Process()) {
-                           next_process_ms_.reset();
-                         }
-                       }),
-          *time_to_next);
+          SafeTask(task_safety_.flag(),
+                   [this] {
+                     RTC_DCHECK_RUN_ON(task_queue_);
+                     if (!Process()) {
+                       next_process_ms_.reset();
+                     }
+                   }),
+          TimeDelta::Millis(*time_to_next));
     }
   }));
 
@@ -146,8 +146,9 @@
     receive_pipe_->SetReceiver(call_->Receiver());
     if (receive_configs_.size() > 1) {
       call_->network_thread()->PostDelayedTask(
-          ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
-          receive_configs_[0].duration.ms());
+          SafeTask(task_safety_.flag(),
+                   [this] { UpdateReceiveNetworkConfig(); }),
+          receive_configs_[0].duration);
     }
   }
   if (!send_configs_.empty()) {
@@ -157,8 +158,8 @@
         call_->network_thread(), task_safety_, clock_, std::move(network));
     if (send_configs_.size() > 1) {
       call_->network_thread()->PostDelayedTask(
-          ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
-          send_configs_[0].duration.ms());
+          SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+          send_configs_[0].duration);
     }
   }
 }
@@ -352,8 +353,8 @@
   send_config_index_ = (send_config_index_ + 1) % send_configs_.size();
   send_simulated_network_->SetConfig(send_configs_[send_config_index_]);
   call_->network_thread()->PostDelayedTask(
-      ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
-      send_configs_[send_config_index_].duration.ms());
+      SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+      send_configs_[send_config_index_].duration);
 }
 
 void DegradedCall::UpdateReceiveNetworkConfig() {
@@ -361,7 +362,7 @@
   receive_simulated_network_->SetConfig(
       receive_configs_[receive_config_index_]);
   call_->network_thread()->PostDelayedTask(
-      ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
-      receive_configs_[receive_config_index_].duration.ms());
+      SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }),
+      receive_configs_[receive_config_index_].duration);
 }
 }  // namespace webrtc
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index e59ea74..b64cc78 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -15,6 +15,7 @@
 #include <string>
 #include <utility>
 
+#include "absl/functional/any_invocable.h"
 #include "call/rtp_transport_controller_send.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
 #include "modules/rtp_rtcp/source/byte_io.h"
@@ -203,10 +204,9 @@
   // default thread as the transport queue, explicit checks for the transport
   // queue (not just using a SequenceChecker) aren't possible unless such a
   // queue is actually active. So RunOnTransportQueue is a convenience function
-  // that allow for running a closure on the transport queue, similar to
+  // that allow for running a `task` on the transport queue, similar to
   // SendTask().
-  template <typename Closure>
-  void RunOnTransportQueue(Closure&& task) {
+  void RunOnTransportQueue(absl::AnyInvocable<void() &&> task) {
     transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
     AdvanceTime(TimeDelta::Millis(0));
   }