Migrate call/ to absl::AnyInvocable based TaskQueueBase interface
Bug: webrtc:14245
Change-Id: Ifcdcd343fcba1d850e40813bc08862c42647b0c5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268002
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37477}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index b72afed..e03f15d 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -526,6 +526,7 @@
]
absl_deps = [
"//third_party/abseil-cpp/absl/container:inlined_vector",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
diff --git a/call/adaptation/resource_adaptation_processor.cc b/call/adaptation/resource_adaptation_processor.cc
index 1f9b662..54a683b 100644
--- a/call/adaptation/resource_adaptation_processor.cc
+++ b/call/adaptation/resource_adaptation_processor.cc
@@ -41,11 +41,11 @@
OnResourceUsageStateMeasured(rtc::scoped_refptr<Resource> resource,
ResourceUsageState usage_state) {
if (!task_queue_->IsCurrent()) {
- task_queue_->PostTask(ToQueuedTask(
+ task_queue_->PostTask(
[this_ref = rtc::scoped_refptr<ResourceListenerDelegate>(this),
resource, usage_state] {
this_ref->OnResourceUsageStateMeasured(resource, usage_state);
- }));
+ });
return;
}
RTC_DCHECK_RUN_ON(task_queue_);
@@ -142,8 +142,8 @@
void ResourceAdaptationProcessor::RemoveLimitationsImposedByResource(
rtc::scoped_refptr<Resource> resource) {
if (!task_queue_->IsCurrent()) {
- task_queue_->PostTask(ToQueuedTask(
- [this, resource]() { RemoveLimitationsImposedByResource(resource); }));
+ task_queue_->PostTask(
+ [this, resource]() { RemoveLimitationsImposedByResource(resource); });
return;
}
RTC_DCHECK_RUN_ON(task_queue_);
diff --git a/call/adaptation/resource_adaptation_processor_unittest.cc b/call/adaptation/resource_adaptation_processor_unittest.cc
index 97c01b3..7f09e22 100644
--- a/call/adaptation/resource_adaptation_processor_unittest.cc
+++ b/call/adaptation/resource_adaptation_processor_unittest.cc
@@ -430,8 +430,8 @@
SetInputStates(true, kDefaultFrameRate, kDefaultFrameSize);
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
- resource_task_queue.PostTask(ToQueuedTask(
- [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); }));
+ resource_task_queue.PostTask(
+ [&]() { resource_->SetUsageState(ResourceUsageState::kOveruse); });
EXPECT_EQ_WAIT(1u, restrictions_listener_.restrictions_updated_count(),
kDefaultTimeoutMs);
@@ -447,10 +447,10 @@
// has passed it on to the processor's task queue.
rtc::Event resource_event;
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
- resource_task_queue.PostTask(ToQueuedTask([&]() {
+ resource_task_queue.PostTask([&]() {
resource_->SetUsageState(ResourceUsageState::kOveruse);
resource_event.Set();
- }));
+ });
EXPECT_TRUE(resource_event.Wait(kDefaultTimeoutMs));
// Now destroy the processor while handling the overuse is in flight.
@@ -470,10 +470,10 @@
rtc::Event overuse_event;
TaskQueueForTest resource_task_queue("ResourceTaskQueue");
// Queues task for `resource_` overuse while `processor_` is still listening.
- resource_task_queue.PostTask(ToQueuedTask([&]() {
+ resource_task_queue.PostTask([&]() {
resource_->SetUsageState(ResourceUsageState::kOveruse);
overuse_event.Set();
- }));
+ });
EXPECT_TRUE(overuse_event.Wait(kDefaultTimeoutMs));
// Once we know the overuse task is queued, remove `resource_` so that
// `processor_` is not listening to it.
diff --git a/call/call.cc b/call/call.cc
index ad08b8a..9be0953 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -1209,14 +1209,14 @@
} else {
// TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
// post to the worker thread.
- worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
+ worker_thread_->PostTask(SafeTask(task_safety_.flag(), std::move(closure)));
}
}
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
RTC_DCHECK_RUN_ON(network_thread_);
worker_thread_->PostTask(
- ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
+ SafeTask(task_safety_.flag(), [this, transport_overhead_per_packet]() {
// TODO(bugs.webrtc.org/11993): Move this over to the network thread.
RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) {
@@ -1408,7 +1408,7 @@
// TODO(bugs.webrtc.org/11993): This should execute directly on the network
// thread.
worker_thread_->PostTask(
- ToQueuedTask(task_safety_, [this, packet = std::move(packet)]() {
+ SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
RTC_DCHECK_RUN_ON(worker_thread_);
receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
diff --git a/call/call_perf_tests.cc b/call/call_perf_tests.cc
index 5be6385..54892f0 100644
--- a/call/call_perf_tests.cc
+++ b/call/call_perf_tests.cc
@@ -113,7 +113,7 @@
task_queue_(task_queue) {}
void OnFrame(const VideoFrame& video_frame) override {
- task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
+ task_queue_->PostTask([this]() { CheckStats(); });
}
void CheckStats() {
@@ -343,7 +343,7 @@
}
task_queue()->PostTask(
- ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
+ [to_delete = observer.release()]() { delete to_delete; });
}
TEST_F(CallPerfTest, Synchronization_PlaysOutAudioAndVideoWithoutClockDrift) {
@@ -680,7 +680,7 @@
private:
// TODO(holmer): Run this with a timer instead of once per packet.
Action OnSendRtp(const uint8_t* packet, size_t length) override {
- task_queue_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+ task_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
VideoSendStream::Stats stats = send_stream_->GetStats();
if (!stats.substreams.empty()) {
@@ -1146,7 +1146,7 @@
const Timestamp now = clock_->CurrentTime();
if (now - last_getstats_time_ > kMinGetStatsInterval) {
last_getstats_time_ = now;
- task_queue_->PostTask(ToQueuedTask([this, now]() {
+ task_queue_->PostTask([this, now]() {
VideoSendStream::Stats stats = send_stream_->GetStats();
for (const auto& stat : stats.substreams) {
encode_frame_rate_lists_[stat.first].push_back(
@@ -1156,7 +1156,7 @@
VerifyStats();
observation_complete_.Set();
}
- }));
+ });
}
return SEND_PACKET;
}
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index 2c01c99..bc3f587 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -62,20 +62,20 @@
return false;
}
- task_queue_->PostTask(ToQueuedTask(task_safety_, [this, time_to_next] {
+ task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] {
RTC_DCHECK_RUN_ON(task_queue_);
int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
if (!next_process_ms_ || next_process_time < *next_process_ms_) {
next_process_ms_ = next_process_time;
task_queue_->PostDelayedHighPrecisionTask(
- ToQueuedTask(task_safety_,
- [this] {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (!Process()) {
- next_process_ms_.reset();
- }
- }),
- *time_to_next);
+ SafeTask(task_safety_.flag(),
+ [this] {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ if (!Process()) {
+ next_process_ms_.reset();
+ }
+ }),
+ TimeDelta::Millis(*time_to_next));
}
}));
@@ -146,8 +146,9 @@
receive_pipe_->SetReceiver(call_->Receiver());
if (receive_configs_.size() > 1) {
call_->network_thread()->PostDelayedTask(
- ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
- receive_configs_[0].duration.ms());
+ SafeTask(task_safety_.flag(),
+ [this] { UpdateReceiveNetworkConfig(); }),
+ receive_configs_[0].duration);
}
}
if (!send_configs_.empty()) {
@@ -157,8 +158,8 @@
call_->network_thread(), task_safety_, clock_, std::move(network));
if (send_configs_.size() > 1) {
call_->network_thread()->PostDelayedTask(
- ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
- send_configs_[0].duration.ms());
+ SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+ send_configs_[0].duration);
}
}
}
@@ -352,8 +353,8 @@
send_config_index_ = (send_config_index_ + 1) % send_configs_.size();
send_simulated_network_->SetConfig(send_configs_[send_config_index_]);
call_->network_thread()->PostDelayedTask(
- ToQueuedTask(task_safety_, [this] { UpdateSendNetworkConfig(); }),
- send_configs_[send_config_index_].duration.ms());
+ SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+ send_configs_[send_config_index_].duration);
}
void DegradedCall::UpdateReceiveNetworkConfig() {
@@ -361,7 +362,7 @@
receive_simulated_network_->SetConfig(
receive_configs_[receive_config_index_]);
call_->network_thread()->PostDelayedTask(
- ToQueuedTask(task_safety_, [this] { UpdateReceiveNetworkConfig(); }),
- receive_configs_[receive_config_index_].duration.ms());
+ SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }),
+ receive_configs_[receive_config_index_].duration);
}
} // namespace webrtc
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index e59ea74..b64cc78 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -15,6 +15,7 @@
#include <string>
#include <utility>
+#include "absl/functional/any_invocable.h"
#include "call/rtp_transport_controller_send.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/byte_io.h"
@@ -203,10 +204,9 @@
// default thread as the transport queue, explicit checks for the transport
// queue (not just using a SequenceChecker) aren't possible unless such a
// queue is actually active. So RunOnTransportQueue is a convenience function
- // that allow for running a closure on the transport queue, similar to
+ // that allow for running a `task` on the transport queue, similar to
// SendTask().
- template <typename Closure>
- void RunOnTransportQueue(Closure&& task) {
+ void RunOnTransportQueue(absl::AnyInvocable<void() &&> task) {
transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
AdvanceTime(TimeDelta::Millis(0));
}