Migrate rtc_base and rtc_tools to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: I71abe3db7a23ad33bd175297e23fa8e927fa9628
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268768
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37553}
diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn
index bb1f5d9..47932de 100644
--- a/rtc_tools/network_tester/BUILD.gn
+++ b/rtc_tools/network_tester/BUILD.gn
@@ -42,6 +42,7 @@
       "../../api:sequence_checker",
       "../../api/task_queue",
       "../../api/task_queue:default_task_queue_factory",
+      "../../api/task_queue:pending_task_safety_flag",
       "../../p2p:rtc_p2p",
       "../../rtc_base",
       "../../rtc_base:checks",
@@ -58,7 +59,10 @@
       "../../rtc_base/system:no_unique_address",
       "../../rtc_base/third_party/sigslot",
     ]
-    absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+    absl_deps = [
+      "//third_party/abseil-cpp/absl/functional:any_invocable",
+      "//third_party/abseil-cpp/absl/types:optional",
+    ]
   }
 
   network_tester_unittests_resources = [
diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc
index b80bb98..c991737 100644
--- a/rtc_tools/network_tester/packet_sender.cc
+++ b/rtc_tools/network_tester/packet_sender.cc
@@ -15,7 +15,8 @@
 #include <string>
 #include <utility>
 
-#include "api/task_queue/queued_task.h"
+#include "absl/functional/any_invocable.h"
+#include "api/task_queue/pending_task_safety_flag.h"
 #include "api/task_queue/task_queue_base.h"
 #include "rtc_base/time_utils.h"
 #include "rtc_tools/network_tester/config_reader.h"
@@ -25,65 +26,47 @@
 
 namespace {
 
-class SendPacketTask : public QueuedTask {
- public:
-  explicit SendPacketTask(
-      PacketSender* packet_sender,
-      rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
-      : target_time_ms_(rtc::TimeMillis()),
-        packet_sender_(packet_sender),
-        task_safety_flag_(task_safety_flag) {}
-
- private:
-  bool Run() override {
-    if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
-      packet_sender_->SendPacket();
-      target_time_ms_ += packet_sender_->GetSendIntervalMs();
-      int64_t delay_ms = std::max(static_cast<int64_t>(0),
-                                  target_time_ms_ - rtc::TimeMillis());
+absl::AnyInvocable<void() &&> SendPacketTask(
+    PacketSender* packet_sender,
+    rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+    int64_t target_time_ms = rtc::TimeMillis()) {
+  return [target_time_ms, packet_sender,
+          task_safety_flag = std::move(task_safety_flag)]() mutable {
+    if (task_safety_flag->alive() && packet_sender->IsSending()) {
+      packet_sender->SendPacket();
+      target_time_ms += packet_sender->GetSendIntervalMs();
+      int64_t delay_ms =
+          std::max(static_cast<int64_t>(0), target_time_ms - rtc::TimeMillis());
       TaskQueueBase::Current()->PostDelayedTask(
-          std::unique_ptr<QueuedTask>(this), delay_ms);
-      return false;
-    } else {
-      return true;
+          SendPacketTask(packet_sender, std::move(task_safety_flag),
+                         target_time_ms),
+          TimeDelta::Millis(delay_ms));
     }
-  }
-  int64_t target_time_ms_;
-  PacketSender* const packet_sender_;
-  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
-};
+  };
+}
 
-class UpdateTestSettingTask : public QueuedTask {
- public:
-  UpdateTestSettingTask(
-      PacketSender* packet_sender,
-      std::unique_ptr<ConfigReader> config_reader,
-      rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
-      : packet_sender_(packet_sender),
-        config_reader_(std::move(config_reader)),
-        task_safety_flag_(task_safety_flag) {}
-
- private:
-  bool Run() override {
-    if (!task_safety_flag_->alive()) {
-      return true;
+absl::AnyInvocable<void() &&> UpdateTestSettingTask(
+    PacketSender* packet_sender,
+    std::unique_ptr<ConfigReader> config_reader,
+    rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag) {
+  return [packet_sender, config_reader = std::move(config_reader),
+          task_safety_flag = std::move(task_safety_flag)]() mutable {
+    if (!task_safety_flag->alive()) {
+      return;
     }
-    auto config = config_reader_->GetNextConfig();
-    if (config) {
-      packet_sender_->UpdateTestSetting((*config).packet_size,
-                                        (*config).packet_send_interval_ms);
+    if (absl::optional<ConfigReader::Config> config =
+            config_reader->GetNextConfig()) {
+      packet_sender->UpdateTestSetting(config->packet_size,
+                                       config->packet_send_interval_ms);
       TaskQueueBase::Current()->PostDelayedTask(
-          std::unique_ptr<QueuedTask>(this), (*config).execution_time_ms);
-      return false;
+          UpdateTestSettingTask(packet_sender, std::move(config_reader),
+                                std::move(task_safety_flag)),
+          TimeDelta::Millis(config->execution_time_ms));
     } else {
-      packet_sender_->StopSending();
-      return true;
+      packet_sender->StopSending();
     }
-  }
-  PacketSender* const packet_sender_;
-  const std::unique_ptr<ConfigReader> config_reader_;
-  rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
-};
+  };
+}
 
 }  // namespace
 
@@ -105,15 +88,14 @@
 
 void PacketSender::StartSending() {
   worker_queue_checker_.Detach();
-  worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+  worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
     RTC_DCHECK_RUN_ON(&worker_queue_checker_);
     sending_ = true;
   }));
-  worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
+  worker_queue_->PostTask(UpdateTestSettingTask(
       this, std::make_unique<ConfigReader>(config_file_path_),
       task_safety_flag_));
-  worker_queue_->PostTask(
-      std::make_unique<SendPacketTask>(this, task_safety_flag_));
+  worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_));
 }
 
 void PacketSender::StopSending() {
diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h
index 323f75b..7fd500f 100644
--- a/rtc_tools/network_tester/packet_sender.h
+++ b/rtc_tools/network_tester/packet_sender.h
@@ -15,10 +15,10 @@
 #include <string>
 
 #include "api/sequence_checker.h"
+#include "api/task_queue/pending_task_safety_flag.h"
 #include "api/task_queue/task_queue_factory.h"
 #include "rtc_base/ignore_wundef.h"
 #include "rtc_base/system/no_unique_address.h"
-#include "rtc_base/task_queue.h"
 
 #ifdef WEBRTC_NETWORK_TESTER_PROTO
 RTC_PUSH_IGNORING_WUNDEF()
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index 6b16708..4ba43cc 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -67,7 +67,7 @@
 void TestController::SendData(const NetworkTesterPacket& packet,
                               absl::optional<size_t> data_size) {
   if (!packet_sender_thread_->IsCurrent()) {
-    packet_sender_thread_->PostTask(ToQueuedTask(
+    packet_sender_thread_->PostTask(SafeTask(
         task_safety_flag_,
         [this, packet, data_size]() { this->SendData(packet, data_size); }));
     return;
diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc
index 346d962..3f35a55 100644
--- a/rtc_tools/video_replay.cc
+++ b/rtc_tools/video_replay.cc
@@ -355,7 +355,7 @@
 
     // Creation of the streams must happen inside a task queue because it is
     // resued as a worker thread.
-    worker_thread->PostTask(ToQueuedTask([&]() {
+    worker_thread->PostTask([&]() {
       call.reset(Call::Create(call_config));
 
       // Attempt to load the configuration
@@ -375,7 +375,7 @@
         receive_stream->Start();
       }
       sync_event.Set();
-    }));
+    });
 
     // Attempt to create an RtpReader from the input file.
     std::unique_ptr<test::RtpFileReader> rtp_reader =
@@ -392,7 +392,7 @@
 
     // Destruction of streams and the call must happen on the same thread as
     // their creation.
-    worker_thread->PostTask(ToQueuedTask([&]() {
+    worker_thread->PostTask([&]() {
       for (const auto& receive_stream : stream_state->receive_streams) {
         call->DestroyVideoReceiveStream(receive_stream);
       }
@@ -401,7 +401,7 @@
       }
       call.reset();
       sync_event.Set();
-    }));
+    });
     sync_event.Wait(/*give_up_after_ms=*/10000);
   }
 
@@ -606,12 +606,12 @@
 
       ++num_packets;
       PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK;
-      worker_thread->PostTask(ToQueuedTask([&]() {
+      worker_thread->PostTask([&]() {
         result = call->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO,
                                                  std::move(packet_buffer),
                                                  /* packet_time_us */ -1);
         event.Set();
-      }));
+      });
       event.Wait(/*give_up_after_ms=*/10000);
       switch (result) {
         case PacketReceiver::DELIVERY_OK: