Migrate rtc_base and rtc_tools to absl::AnyInvocable based TaskQueueBase interface
Bug: webrtc:14245
Change-Id: I71abe3db7a23ad33bd175297e23fa8e927fa9628
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268768
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37553}
diff --git a/rtc_tools/network_tester/BUILD.gn b/rtc_tools/network_tester/BUILD.gn
index bb1f5d9..47932de 100644
--- a/rtc_tools/network_tester/BUILD.gn
+++ b/rtc_tools/network_tester/BUILD.gn
@@ -42,6 +42,7 @@
"../../api:sequence_checker",
"../../api/task_queue",
"../../api/task_queue:default_task_queue_factory",
+ "../../api/task_queue:pending_task_safety_flag",
"../../p2p:rtc_p2p",
"../../rtc_base",
"../../rtc_base:checks",
@@ -58,7 +59,10 @@
"../../rtc_base/system:no_unique_address",
"../../rtc_base/third_party/sigslot",
]
- absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
+ "//third_party/abseil-cpp/absl/types:optional",
+ ]
}
network_tester_unittests_resources = [
diff --git a/rtc_tools/network_tester/packet_sender.cc b/rtc_tools/network_tester/packet_sender.cc
index b80bb98..c991737 100644
--- a/rtc_tools/network_tester/packet_sender.cc
+++ b/rtc_tools/network_tester/packet_sender.cc
@@ -15,7 +15,8 @@
#include <string>
#include <utility>
-#include "api/task_queue/queued_task.h"
+#include "absl/functional/any_invocable.h"
+#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/time_utils.h"
#include "rtc_tools/network_tester/config_reader.h"
@@ -25,65 +26,47 @@
namespace {
-class SendPacketTask : public QueuedTask {
- public:
- explicit SendPacketTask(
- PacketSender* packet_sender,
- rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
- : target_time_ms_(rtc::TimeMillis()),
- packet_sender_(packet_sender),
- task_safety_flag_(task_safety_flag) {}
-
- private:
- bool Run() override {
- if (task_safety_flag_->alive() && packet_sender_->IsSending()) {
- packet_sender_->SendPacket();
- target_time_ms_ += packet_sender_->GetSendIntervalMs();
- int64_t delay_ms = std::max(static_cast<int64_t>(0),
- target_time_ms_ - rtc::TimeMillis());
+absl::AnyInvocable<void() &&> SendPacketTask(
+ PacketSender* packet_sender,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
+ int64_t target_time_ms = rtc::TimeMillis()) {
+ return [target_time_ms, packet_sender,
+ task_safety_flag = std::move(task_safety_flag)]() mutable {
+ if (task_safety_flag->alive() && packet_sender->IsSending()) {
+ packet_sender->SendPacket();
+ target_time_ms += packet_sender->GetSendIntervalMs();
+ int64_t delay_ms =
+ std::max(static_cast<int64_t>(0), target_time_ms - rtc::TimeMillis());
TaskQueueBase::Current()->PostDelayedTask(
- std::unique_ptr<QueuedTask>(this), delay_ms);
- return false;
- } else {
- return true;
+ SendPacketTask(packet_sender, std::move(task_safety_flag),
+ target_time_ms),
+ TimeDelta::Millis(delay_ms));
}
- }
- int64_t target_time_ms_;
- PacketSender* const packet_sender_;
- rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
-};
+ };
+}
-class UpdateTestSettingTask : public QueuedTask {
- public:
- UpdateTestSettingTask(
- PacketSender* packet_sender,
- std::unique_ptr<ConfigReader> config_reader,
- rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag)
- : packet_sender_(packet_sender),
- config_reader_(std::move(config_reader)),
- task_safety_flag_(task_safety_flag) {}
-
- private:
- bool Run() override {
- if (!task_safety_flag_->alive()) {
- return true;
+absl::AnyInvocable<void() &&> UpdateTestSettingTask(
+ PacketSender* packet_sender,
+ std::unique_ptr<ConfigReader> config_reader,
+ rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag) {
+ return [packet_sender, config_reader = std::move(config_reader),
+ task_safety_flag = std::move(task_safety_flag)]() mutable {
+ if (!task_safety_flag->alive()) {
+ return;
}
- auto config = config_reader_->GetNextConfig();
- if (config) {
- packet_sender_->UpdateTestSetting((*config).packet_size,
- (*config).packet_send_interval_ms);
+ if (absl::optional<ConfigReader::Config> config =
+ config_reader->GetNextConfig()) {
+ packet_sender->UpdateTestSetting(config->packet_size,
+ config->packet_send_interval_ms);
TaskQueueBase::Current()->PostDelayedTask(
- std::unique_ptr<QueuedTask>(this), (*config).execution_time_ms);
- return false;
+ UpdateTestSettingTask(packet_sender, std::move(config_reader),
+ std::move(task_safety_flag)),
+ TimeDelta::Millis(config->execution_time_ms));
} else {
- packet_sender_->StopSending();
- return true;
+ packet_sender->StopSending();
}
- }
- PacketSender* const packet_sender_;
- const std::unique_ptr<ConfigReader> config_reader_;
- rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag_;
-};
+ };
+}
} // namespace
@@ -105,15 +88,14 @@
void PacketSender::StartSending() {
worker_queue_checker_.Detach();
- worker_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+ worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
sending_ = true;
}));
- worker_queue_->PostTask(std::make_unique<UpdateTestSettingTask>(
+ worker_queue_->PostTask(UpdateTestSettingTask(
this, std::make_unique<ConfigReader>(config_file_path_),
task_safety_flag_));
- worker_queue_->PostTask(
- std::make_unique<SendPacketTask>(this, task_safety_flag_));
+ worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_));
}
void PacketSender::StopSending() {
diff --git a/rtc_tools/network_tester/packet_sender.h b/rtc_tools/network_tester/packet_sender.h
index 323f75b..7fd500f 100644
--- a/rtc_tools/network_tester/packet_sender.h
+++ b/rtc_tools/network_tester/packet_sender.h
@@ -15,10 +15,10 @@
#include <string>
#include "api/sequence_checker.h"
+#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
#include "rtc_base/ignore_wundef.h"
#include "rtc_base/system/no_unique_address.h"
-#include "rtc_base/task_queue.h"
#ifdef WEBRTC_NETWORK_TESTER_PROTO
RTC_PUSH_IGNORING_WUNDEF()
diff --git a/rtc_tools/network_tester/test_controller.cc b/rtc_tools/network_tester/test_controller.cc
index 6b16708..4ba43cc 100644
--- a/rtc_tools/network_tester/test_controller.cc
+++ b/rtc_tools/network_tester/test_controller.cc
@@ -67,7 +67,7 @@
void TestController::SendData(const NetworkTesterPacket& packet,
absl::optional<size_t> data_size) {
if (!packet_sender_thread_->IsCurrent()) {
- packet_sender_thread_->PostTask(ToQueuedTask(
+ packet_sender_thread_->PostTask(SafeTask(
task_safety_flag_,
[this, packet, data_size]() { this->SendData(packet, data_size); }));
return;
diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc
index 346d962..3f35a55 100644
--- a/rtc_tools/video_replay.cc
+++ b/rtc_tools/video_replay.cc
@@ -355,7 +355,7 @@
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
- worker_thread->PostTask(ToQueuedTask([&]() {
+ worker_thread->PostTask([&]() {
call.reset(Call::Create(call_config));
// Attempt to load the configuration
@@ -375,7 +375,7 @@
receive_stream->Start();
}
sync_event.Set();
- }));
+ });
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
@@ -392,7 +392,7 @@
// Destruction of streams and the call must happen on the same thread as
// their creation.
- worker_thread->PostTask(ToQueuedTask([&]() {
+ worker_thread->PostTask([&]() {
for (const auto& receive_stream : stream_state->receive_streams) {
call->DestroyVideoReceiveStream(receive_stream);
}
@@ -401,7 +401,7 @@
}
call.reset();
sync_event.Set();
- }));
+ });
sync_event.Wait(/*give_up_after_ms=*/10000);
}
@@ -606,12 +606,12 @@
++num_packets;
PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK;
- worker_thread->PostTask(ToQueuedTask([&]() {
+ worker_thread->PostTask([&]() {
result = call->Receiver()->DeliverPacket(webrtc::MediaType::VIDEO,
std::move(packet_buffer),
/* packet_time_us */ -1);
event.Set();
- }));
+ });
event.Wait(/*give_up_after_ms=*/10000);
switch (result) {
case PacketReceiver::DELIVERY_OK: