Update TaskQueueWin implementation to absl::AnyInvocable
Bug: webrtc:14245
Change-Id: I4203f4dbbdc9c2ee4a6440942215341182f180db
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/269000
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37571}
diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc
index dd14a7d..bb8e522 100644
--- a/rtc_base/task_queue_win.cc
+++ b/rtc_base/task_queue_win.cc
@@ -24,14 +24,17 @@
#include <string.h>
#include <algorithm>
+#include <functional>
#include <memory>
#include <queue>
#include <utility>
+#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
-#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
@@ -43,7 +46,6 @@
namespace webrtc {
namespace {
-#define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2
void CALLBACK InitializeQueueThread(ULONG_PTR param) {
@@ -65,10 +67,10 @@
}
}
-int64_t GetTick() {
+Timestamp CurrentTime() {
static const UINT kPeriod = 1;
bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
- int64_t ret = rtc::TimeMillis();
+ Timestamp ret = Timestamp::Micros(rtc::TimeMicros());
if (high_res)
timeEndPeriod(kPeriod);
return ret;
@@ -78,8 +80,8 @@
public:
// Default ctor needed to support priority_queue::pop().
DelayedTaskInfo() {}
- DelayedTaskInfo(uint32_t milliseconds, std::unique_ptr<QueuedTask> task)
- : due_time_(GetTick() + milliseconds), task_(std::move(task)) {}
+ DelayedTaskInfo(TimeDelta delay, absl::AnyInvocable<void() &&> task)
+ : due_time_(CurrentTime() + delay), task_(std::move(task)) {}
DelayedTaskInfo(DelayedTaskInfo&&) = default;
// Implement for priority_queue.
@@ -92,14 +94,14 @@
// See below for why this method is const.
void Run() const {
- RTC_DCHECK(due_time_);
- task_->Run() ? task_.reset() : static_cast<void>(task_.release());
+ RTC_DCHECK(task_);
+ std::move(task_)();
}
- int64_t due_time() const { return due_time_; }
+ Timestamp due_time() const { return due_time_; }
private:
- int64_t due_time_ = 0; // Absolute timestamp in milliseconds.
+ Timestamp due_time_ = Timestamp::Zero();
// `task` needs to be mutable because std::priority_queue::top() returns
// a const reference and a key in an ordered queue must not be changed.
@@ -108,7 +110,7 @@
// (`task`), mutable.
// Because of this, the `task` variable is made private and can only be
// mutated by calling the `Run()` method.
- mutable std::unique_ptr<QueuedTask> task_;
+ mutable absl::AnyInvocable<void() &&> task_;
};
class MultimediaTimer {
@@ -158,10 +160,11 @@
~TaskQueueWin() override = default;
void Delete() override;
- void PostTask(std::unique_ptr<QueuedTask> task) override;
- void PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) override;
-
+ void PostTask(absl::AnyInvocable<void() &&> task) override;
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
+ void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
void RunPendingTasks();
private:
@@ -171,25 +174,18 @@
void ScheduleNextTimer();
void CancelTimers();
+ MultimediaTimer timer_;
// Since priority_queue<> by defult orders items in terms of
// largest->smallest, using std::less<>, and we want smallest->largest,
- // we would like to use std::greater<> here. Alas it's only available in
- // C++14 and later, so we roll our own compare template that that relies on
- // operator<().
- template <typename T>
- struct greater {
- bool operator()(const T& l, const T& r) { return l > r; }
- };
-
- MultimediaTimer timer_;
+ // we would like to use std::greater<> here.
std::priority_queue<DelayedTaskInfo,
std::vector<DelayedTaskInfo>,
- greater<DelayedTaskInfo>>
+ std::greater<DelayedTaskInfo>>
timer_tasks_;
UINT_PTR timer_id_ = 0;
rtc::PlatformThread thread_;
Mutex pending_lock_;
- std::queue<std::unique_ptr<QueuedTask>> pending_
+ std::queue<absl::AnyInvocable<void() &&>> pending_
RTC_GUARDED_BY(pending_lock_);
HANDLE in_queue_;
};
@@ -221,24 +217,20 @@
delete this;
}
-void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueWin::PostTask(absl::AnyInvocable<void() &&> task) {
MutexLock lock(&pending_lock_);
pending_.push(std::move(task));
::SetEvent(in_queue_);
}
-void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- if (!milliseconds) {
+void TaskQueueWin::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ if (delay <= TimeDelta::Zero()) {
PostTask(std::move(task));
return;
}
- // TODO(tommi): Avoid this allocation. It is currently here since
- // the timestamp stored in the task info object, is a 64bit timestamp
- // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
- // task pointer and timestamp as LPARAM and WPARAM.
- auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
+ auto* task_info = new DelayedTaskInfo(delay, std::move(task));
RTC_CHECK(thread_.GetHandle() != absl::nullopt);
if (!::PostThreadMessage(GetThreadId(*thread_.GetHandle()),
WM_QUEUE_DELAYED_TASK, 0,
@@ -247,9 +239,15 @@
}
}
+void TaskQueueWin::PostDelayedHighPrecisionTask(
+ absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ PostDelayedTask(std::move(task), delay);
+}
+
void TaskQueueWin::RunPendingTasks() {
while (true) {
- std::unique_ptr<QueuedTask> task;
+ absl::AnyInvocable<void() &&> task;
{
MutexLock lock(&pending_lock_);
if (pending_.empty())
@@ -258,8 +256,7 @@
pending_.pop();
}
- if (!task->Run())
- task.release();
+ std::move(task)();
}
}
@@ -300,26 +297,19 @@
// To protect against overly busy message queues, we limit the time
// we process tasks to a few milliseconds. If we don't do that, there's
// a chance that timer tasks won't ever run.
- static const int kMaxTaskProcessingTimeMs = 500;
- auto start = GetTick();
+ static constexpr TimeDelta kMaxTaskProcessingTime = TimeDelta::Millis(500);
+ Timestamp start = CurrentTime();
while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
msg.message != WM_QUIT) {
if (!msg.hwnd) {
switch (msg.message) {
- // TODO(tommi): Stop using this way of queueing tasks.
- case WM_RUN_TASK: {
- QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
- if (task->Run())
- delete task;
- break;
- }
case WM_QUEUE_DELAYED_TASK: {
std::unique_ptr<DelayedTaskInfo> info(
reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
bool need_to_schedule_timers =
timer_tasks_.empty() ||
timer_tasks_.top().due_time() > info->due_time();
- timer_tasks_.emplace(std::move(*info.get()));
+ timer_tasks_.push(std::move(*info));
if (need_to_schedule_timers) {
CancelTimers();
ScheduleNextTimer();
@@ -343,7 +333,7 @@
::DispatchMessage(&msg);
}
- if (GetTick() > start + kMaxTaskProcessingTimeMs)
+ if (CurrentTime() > start + kMaxTaskProcessingTime)
break;
}
return msg.message != WM_QUIT;
@@ -351,7 +341,7 @@
void TaskQueueWin::RunDueTasks() {
RTC_DCHECK(!timer_tasks_.empty());
- auto now = GetTick();
+ Timestamp now = CurrentTime();
do {
const auto& top = timer_tasks_.top();
if (top.due_time() > now)
@@ -367,8 +357,9 @@
return;
const auto& next_task = timer_tasks_.top();
- int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick());
- uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms);
+ TimeDelta delay =
+ std::max(TimeDelta::Zero(), next_task.due_time() - CurrentTime());
+ uint32_t milliseconds = delay.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>();
if (!timer_.StartOneShotTimer(milliseconds))
timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
}