Migrate win task queue to TaskQueueBase interface
Bug: webrtc:10191
Change-Id: I498c4187883206d7082d9f7323575f087e041370
Reviewed-on: https://webrtc-review.googlesource.com/c/123485
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26791}
diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc
index b73aa95..696eda3 100644
--- a/rtc_base/task_queue_win.cc
+++ b/rtc_base/task_queue_win.cc
@@ -8,7 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "rtc_base/task_queue.h"
+#include "rtc_base/task_queue_win.h"
// clang-format off
// clang formating would change include order.
@@ -27,6 +27,10 @@
#include <queue>
#include <utility>
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
#include "rtc_base/critical_section.h"
@@ -34,62 +38,40 @@
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/platform_thread.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/ref_counted_object.h"
#include "rtc_base/time_utils.h"
-namespace rtc {
+namespace webrtc {
namespace {
#define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2
-using Priority = TaskQueue::Priority;
-
-DWORD g_queue_ptr_tls = 0;
-
-BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
- g_queue_ptr_tls = TlsAlloc();
- return TRUE;
-}
-
-DWORD GetQueuePtrTls() {
- static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
- ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
- return g_queue_ptr_tls;
-}
-
-struct ThreadStartupData {
- Event* started;
- void* thread_context;
-};
-
void CALLBACK InitializeQueueThread(ULONG_PTR param) {
MSG msg;
::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
- ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
- ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
- data->started->Set();
+ rtc::Event* data = reinterpret_cast<rtc::Event*>(param);
+ data->Set();
}
-ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
+ TaskQueueFactory::Priority priority) {
switch (priority) {
- case Priority::HIGH:
- return kRealtimePriority;
- case Priority::LOW:
- return kLowPriority;
- case Priority::NORMAL:
- return kNormalPriority;
+ case TaskQueueFactory::Priority::HIGH:
+ return rtc::kRealtimePriority;
+ case TaskQueueFactory::Priority::LOW:
+ return rtc::kLowPriority;
+ case TaskQueueFactory::Priority::NORMAL:
+ return rtc::kNormalPriority;
default:
RTC_NOTREACHED();
break;
}
- return kNormalPriority;
+ return rtc::kNormalPriority;
}
int64_t GetTick() {
static const UINT kPeriod = 1;
bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
- int64_t ret = TimeMillis();
+ int64_t ret = rtc::TimeMillis();
if (high_res)
timeEndPeriod(kPeriod);
return ret;
@@ -168,81 +150,56 @@
RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
};
-} // namespace
-
-class TaskQueue::Impl : public RefCountInterface {
+class TaskQueueWin : public TaskQueueBase {
public:
- Impl(const char* queue_name, TaskQueue* queue, Priority priority);
- ~Impl() override;
+ TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority);
+ ~TaskQueueWin() override = default;
- static TaskQueue::Impl* Current();
- static TaskQueue* CurrentQueue();
-
- // Used for DCHECKing the current queue.
- bool IsCurrent() const;
-
- template <class Closure,
- typename std::enable_if<!std::is_convertible<
- Closure,
- std::unique_ptr<QueuedTask>>::value>::type* = nullptr>
- void PostTask(Closure&& closure) {
- PostTask(NewClosure(std::forward<Closure>(closure)));
- }
-
- void PostTask(std::unique_ptr<QueuedTask> task);
- void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+ void Delete() override;
+ void PostTask(std::unique_ptr<QueuedTask> task) override;
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) override;
void RunPendingTasks();
private:
static void ThreadMain(void* context);
- class WorkerThread : public PlatformThread {
+ class WorkerThread : public rtc::PlatformThread {
public:
- WorkerThread(ThreadRunFunction func,
+ WorkerThread(rtc::ThreadRunFunction func,
void* obj,
- const char* thread_name,
- ThreadPriority priority)
+ absl::string_view thread_name,
+ rtc::ThreadPriority priority)
: PlatformThread(func, obj, thread_name, priority) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
- return PlatformThread::QueueAPC(apc_function, data);
+ return rtc::PlatformThread::QueueAPC(apc_function, data);
}
};
- class ThreadState {
- public:
- explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
- ~ThreadState() {}
+ void RunThreadMain();
+ bool ProcessQueuedMessages();
+ void RunDueTasks();
+ void ScheduleNextTimer();
+ void CancelTimers();
- void RunThreadMain();
-
- private:
- bool ProcessQueuedMessages();
- void RunDueTasks();
- void ScheduleNextTimer();
- void CancelTimers();
-
- // Since priority_queue<> by defult orders items in terms of
- // largest->smallest, using std::less<>, and we want smallest->largest,
- // we would like to use std::greater<> here. Alas it's only available in
- // C++14 and later, so we roll our own compare template that that relies on
- // operator<().
- template <typename T>
- struct greater {
- bool operator()(const T& l, const T& r) { return l > r; }
- };
-
- MultimediaTimer timer_;
- std::priority_queue<DelayedTaskInfo,
- std::vector<DelayedTaskInfo>,
- greater<DelayedTaskInfo>>
- timer_tasks_;
- UINT_PTR timer_id_ = 0;
- HANDLE in_queue_;
+ // Since priority_queue<> by defult orders items in terms of
+ // largest->smallest, using std::less<>, and we want smallest->largest,
+ // we would like to use std::greater<> here. Alas it's only available in
+ // C++14 and later, so we roll our own compare template that that relies on
+ // operator<().
+ template <typename T>
+ struct greater {
+ bool operator()(const T& l, const T& r) { return l > r; }
};
- TaskQueue* const queue_;
+ MultimediaTimer timer_;
+ std::priority_queue<DelayedTaskInfo,
+ std::vector<DelayedTaskInfo>,
+ greater<DelayedTaskInfo>>
+ timer_tasks_;
+ UINT_PTR timer_id_ = 0;
WorkerThread thread_;
rtc::CriticalSection pending_lock_;
std::queue<std::unique_ptr<QueuedTask>> pending_
@@ -250,26 +207,19 @@
HANDLE in_queue_;
};
-TaskQueue::Impl::Impl(const char* queue_name,
- TaskQueue* queue,
- Priority priority)
- : queue_(queue),
- thread_(&TaskQueue::Impl::ThreadMain,
- this,
- queue_name,
- TaskQueuePriorityToThreadPriority(priority)),
+TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
+ rtc::ThreadPriority priority)
+ : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority),
in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
- RTC_DCHECK(queue_name);
RTC_DCHECK(in_queue_);
thread_.Start();
- Event event(false, false);
- ThreadStartupData startup = {&event, this};
+ rtc::Event event(false, false);
RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
- reinterpret_cast<ULONG_PTR>(&startup)));
- event.Wait(Event::kForever);
+ reinterpret_cast<ULONG_PTR>(&event)));
+ event.Wait(rtc::Event::kForever);
}
-TaskQueue::Impl::~Impl() {
+void TaskQueueWin::Delete() {
RTC_DCHECK(!IsCurrent());
while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
@@ -277,31 +227,17 @@
}
thread_.Stop();
::CloseHandle(in_queue_);
+ delete this;
}
-// static
-TaskQueue::Impl* TaskQueue::Impl::Current() {
- return static_cast<TaskQueue::Impl*>(::TlsGetValue(GetQueuePtrTls()));
-}
-
-// static
-TaskQueue* TaskQueue::Impl::CurrentQueue() {
- TaskQueue::Impl* current = Current();
- return current ? current->queue_ : nullptr;
-}
-
-bool TaskQueue::Impl::IsCurrent() const {
- return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) {
rtc::CritScope lock(&pending_lock_);
pending_.push(std::move(task));
::SetEvent(in_queue_);
}
-void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
+void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
if (!milliseconds) {
PostTask(std::move(task));
return;
@@ -318,7 +254,7 @@
}
}
-void TaskQueue::Impl::RunPendingTasks() {
+void TaskQueueWin::RunPendingTasks() {
while (true) {
std::unique_ptr<QueuedTask> task;
{
@@ -335,12 +271,12 @@
}
// static
-void TaskQueue::Impl::ThreadMain(void* context) {
- ThreadState state(static_cast<TaskQueue::Impl*>(context)->in_queue_);
- state.RunThreadMain();
+void TaskQueueWin::ThreadMain(void* context) {
+ static_cast<TaskQueueWin*>(context)->RunThreadMain();
}
-void TaskQueue::Impl::ThreadState::RunThreadMain() {
+void TaskQueueWin::RunThreadMain() {
+ CurrentTaskQueueSetter set_current(this);
HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};
while (true) {
// Make sure we do an alertable wait as that's required to allow APCs to run
@@ -366,12 +302,12 @@
if (result == (WAIT_OBJECT_0 + 1)) {
::ResetEvent(in_queue_);
- TaskQueue::Impl::Current()->RunPendingTasks();
+ RunPendingTasks();
}
}
}
-bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() {
+bool TaskQueueWin::ProcessQueuedMessages() {
MSG msg = {};
// To protect against overly busy message queues, we limit the time
// we process tasks to a few milliseconds. If we don't do that, there's
@@ -425,7 +361,7 @@
return msg.message != WM_QUIT;
}
-void TaskQueue::Impl::ThreadState::RunDueTasks() {
+void TaskQueueWin::RunDueTasks() {
RTC_DCHECK(!timer_tasks_.empty());
auto now = GetTick();
do {
@@ -437,7 +373,7 @@
} while (!timer_tasks_.empty());
}
-void TaskQueue::Impl::ThreadState::ScheduleNextTimer() {
+void TaskQueueWin::ScheduleNextTimer() {
RTC_DCHECK_EQ(timer_id_, 0);
if (timer_tasks_.empty())
return;
@@ -449,7 +385,7 @@
timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
}
-void TaskQueue::Impl::ThreadState::CancelTimers() {
+void TaskQueueWin::CancelTimers() {
timer_.Cancel();
if (timer_id_) {
::KillTimer(nullptr, timer_id_);
@@ -457,30 +393,20 @@
}
}
-// Boilerplate for the PIMPL pattern.
-TaskQueue::TaskQueue(const char* queue_name, Priority priority)
- : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+class TaskQueueWinFactory : public TaskQueueFactory {
+ public:
+ std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+ absl::string_view name,
+ Priority priority) const override {
+ return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+ new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
+ }
+};
+
+} // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
+ return absl::make_unique<TaskQueueWinFactory>();
}
-TaskQueue::~TaskQueue() {}
-
-// static
-TaskQueue* TaskQueue::Current() {
- return TaskQueue::Impl::CurrentQueue();
-}
-
-// Used for DCHECKing the current queue.
-bool TaskQueue::IsCurrent() const {
- return impl_->IsCurrent();
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- return TaskQueue::impl_->PostTask(std::move(task));
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
-}
-
-} // namespace rtc
+} // namespace webrtc