Migrate win task queue to TaskQueueBase interface

Bug: webrtc:10191
Change-Id: I498c4187883206d7082d9f7323575f087e041370
Reviewed-on: https://webrtc-review.googlesource.com/c/123485
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26791}
diff --git a/rtc_base/task_queue_win.cc b/rtc_base/task_queue_win.cc
index b73aa95..696eda3 100644
--- a/rtc_base/task_queue_win.cc
+++ b/rtc_base/task_queue_win.cc
@@ -8,7 +8,7 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
-#include "rtc_base/task_queue.h"
+#include "rtc_base/task_queue_win.h"
 
 // clang-format off
 // clang formating would change include order.
@@ -27,6 +27,10 @@
 #include <queue>
 #include <utility>
 
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
 #include "rtc_base/arraysize.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/critical_section.h"
@@ -34,62 +38,40 @@
 #include "rtc_base/logging.h"
 #include "rtc_base/numerics/safe_conversions.h"
 #include "rtc_base/platform_thread.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/ref_counted_object.h"
 #include "rtc_base/time_utils.h"
 
-namespace rtc {
+namespace webrtc {
 namespace {
 #define WM_RUN_TASK WM_USER + 1
 #define WM_QUEUE_DELAYED_TASK WM_USER + 2
 
-using Priority = TaskQueue::Priority;
-
-DWORD g_queue_ptr_tls = 0;
-
-BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
-  g_queue_ptr_tls = TlsAlloc();
-  return TRUE;
-}
-
-DWORD GetQueuePtrTls() {
-  static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
-  ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
-  return g_queue_ptr_tls;
-}
-
-struct ThreadStartupData {
-  Event* started;
-  void* thread_context;
-};
-
 void CALLBACK InitializeQueueThread(ULONG_PTR param) {
   MSG msg;
   ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
-  ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
-  ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
-  data->started->Set();
+  rtc::Event* data = reinterpret_cast<rtc::Event*>(param);
+  data->Set();
 }
 
-ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+rtc::ThreadPriority TaskQueuePriorityToThreadPriority(
+    TaskQueueFactory::Priority priority) {
   switch (priority) {
-    case Priority::HIGH:
-      return kRealtimePriority;
-    case Priority::LOW:
-      return kLowPriority;
-    case Priority::NORMAL:
-      return kNormalPriority;
+    case TaskQueueFactory::Priority::HIGH:
+      return rtc::kRealtimePriority;
+    case TaskQueueFactory::Priority::LOW:
+      return rtc::kLowPriority;
+    case TaskQueueFactory::Priority::NORMAL:
+      return rtc::kNormalPriority;
     default:
       RTC_NOTREACHED();
       break;
   }
-  return kNormalPriority;
+  return rtc::kNormalPriority;
 }
 
 int64_t GetTick() {
   static const UINT kPeriod = 1;
   bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
-  int64_t ret = TimeMillis();
+  int64_t ret = rtc::TimeMillis();
   if (high_res)
     timeEndPeriod(kPeriod);
   return ret;
@@ -168,81 +150,56 @@
   RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
 };
 
-}  // namespace
-
-class TaskQueue::Impl : public RefCountInterface {
+class TaskQueueWin : public TaskQueueBase {
  public:
-  Impl(const char* queue_name, TaskQueue* queue, Priority priority);
-  ~Impl() override;
+  TaskQueueWin(absl::string_view queue_name, rtc::ThreadPriority priority);
+  ~TaskQueueWin() override = default;
 
-  static TaskQueue::Impl* Current();
-  static TaskQueue* CurrentQueue();
-
-  // Used for DCHECKing the current queue.
-  bool IsCurrent() const;
-
-  template <class Closure,
-            typename std::enable_if<!std::is_convertible<
-                Closure,
-                std::unique_ptr<QueuedTask>>::value>::type* = nullptr>
-  void PostTask(Closure&& closure) {
-    PostTask(NewClosure(std::forward<Closure>(closure)));
-  }
-
-  void PostTask(std::unique_ptr<QueuedTask> task);
-  void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+  void Delete() override;
+  void PostTask(std::unique_ptr<QueuedTask> task) override;
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                       uint32_t milliseconds) override;
 
   void RunPendingTasks();
 
  private:
   static void ThreadMain(void* context);
 
-  class WorkerThread : public PlatformThread {
+  class WorkerThread : public rtc::PlatformThread {
    public:
-    WorkerThread(ThreadRunFunction func,
+    WorkerThread(rtc::ThreadRunFunction func,
                  void* obj,
-                 const char* thread_name,
-                 ThreadPriority priority)
+                 absl::string_view thread_name,
+                 rtc::ThreadPriority priority)
         : PlatformThread(func, obj, thread_name, priority) {}
 
     bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
-      return PlatformThread::QueueAPC(apc_function, data);
+      return rtc::PlatformThread::QueueAPC(apc_function, data);
     }
   };
 
-  class ThreadState {
-   public:
-    explicit ThreadState(HANDLE in_queue) : in_queue_(in_queue) {}
-    ~ThreadState() {}
+  void RunThreadMain();
+  bool ProcessQueuedMessages();
+  void RunDueTasks();
+  void ScheduleNextTimer();
+  void CancelTimers();
 
-    void RunThreadMain();
-
-   private:
-    bool ProcessQueuedMessages();
-    void RunDueTasks();
-    void ScheduleNextTimer();
-    void CancelTimers();
-
-    // Since priority_queue<> by defult orders items in terms of
-    // largest->smallest, using std::less<>, and we want smallest->largest,
-    // we would like to use std::greater<> here. Alas it's only available in
-    // C++14 and later, so we roll our own compare template that that relies on
-    // operator<().
-    template <typename T>
-    struct greater {
-      bool operator()(const T& l, const T& r) { return l > r; }
-    };
-
-    MultimediaTimer timer_;
-    std::priority_queue<DelayedTaskInfo,
-                        std::vector<DelayedTaskInfo>,
-                        greater<DelayedTaskInfo>>
-        timer_tasks_;
-    UINT_PTR timer_id_ = 0;
-    HANDLE in_queue_;
+  // Since priority_queue<> by defult orders items in terms of
+  // largest->smallest, using std::less<>, and we want smallest->largest,
+  // we would like to use std::greater<> here. Alas it's only available in
+  // C++14 and later, so we roll our own compare template that that relies on
+  // operator<().
+  template <typename T>
+  struct greater {
+    bool operator()(const T& l, const T& r) { return l > r; }
   };
 
-  TaskQueue* const queue_;
+  MultimediaTimer timer_;
+  std::priority_queue<DelayedTaskInfo,
+                      std::vector<DelayedTaskInfo>,
+                      greater<DelayedTaskInfo>>
+      timer_tasks_;
+  UINT_PTR timer_id_ = 0;
   WorkerThread thread_;
   rtc::CriticalSection pending_lock_;
   std::queue<std::unique_ptr<QueuedTask>> pending_
@@ -250,26 +207,19 @@
   HANDLE in_queue_;
 };
 
-TaskQueue::Impl::Impl(const char* queue_name,
-                      TaskQueue* queue,
-                      Priority priority)
-    : queue_(queue),
-      thread_(&TaskQueue::Impl::ThreadMain,
-              this,
-              queue_name,
-              TaskQueuePriorityToThreadPriority(priority)),
+TaskQueueWin::TaskQueueWin(absl::string_view queue_name,
+                           rtc::ThreadPriority priority)
+    : thread_(&TaskQueueWin::ThreadMain, this, queue_name, priority),
       in_queue_(::CreateEvent(nullptr, true, false, nullptr)) {
-  RTC_DCHECK(queue_name);
   RTC_DCHECK(in_queue_);
   thread_.Start();
-  Event event(false, false);
-  ThreadStartupData startup = {&event, this};
+  rtc::Event event(false, false);
   RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
-                             reinterpret_cast<ULONG_PTR>(&startup)));
-  event.Wait(Event::kForever);
+                             reinterpret_cast<ULONG_PTR>(&event)));
+  event.Wait(rtc::Event::kForever);
 }
 
-TaskQueue::Impl::~Impl() {
+void TaskQueueWin::Delete() {
   RTC_DCHECK(!IsCurrent());
   while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
     RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
@@ -277,31 +227,17 @@
   }
   thread_.Stop();
   ::CloseHandle(in_queue_);
+  delete this;
 }
 
-// static
-TaskQueue::Impl* TaskQueue::Impl::Current() {
-  return static_cast<TaskQueue::Impl*>(::TlsGetValue(GetQueuePtrTls()));
-}
-
-// static
-TaskQueue* TaskQueue::Impl::CurrentQueue() {
-  TaskQueue::Impl* current = Current();
-  return current ? current->queue_ : nullptr;
-}
-
-bool TaskQueue::Impl::IsCurrent() const {
-  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueWin::PostTask(std::unique_ptr<QueuedTask> task) {
   rtc::CritScope lock(&pending_lock_);
   pending_.push(std::move(task));
   ::SetEvent(in_queue_);
 }
 
-void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                      uint32_t milliseconds) {
+void TaskQueueWin::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                   uint32_t milliseconds) {
   if (!milliseconds) {
     PostTask(std::move(task));
     return;
@@ -318,7 +254,7 @@
   }
 }
 
-void TaskQueue::Impl::RunPendingTasks() {
+void TaskQueueWin::RunPendingTasks() {
   while (true) {
     std::unique_ptr<QueuedTask> task;
     {
@@ -335,12 +271,12 @@
 }
 
 // static
-void TaskQueue::Impl::ThreadMain(void* context) {
-  ThreadState state(static_cast<TaskQueue::Impl*>(context)->in_queue_);
-  state.RunThreadMain();
+void TaskQueueWin::ThreadMain(void* context) {
+  static_cast<TaskQueueWin*>(context)->RunThreadMain();
 }
 
-void TaskQueue::Impl::ThreadState::RunThreadMain() {
+void TaskQueueWin::RunThreadMain() {
+  CurrentTaskQueueSetter set_current(this);
   HANDLE handles[2] = {*timer_.event_for_wait(), in_queue_};
   while (true) {
     // Make sure we do an alertable wait as that's required to allow APCs to run
@@ -366,12 +302,12 @@
 
     if (result == (WAIT_OBJECT_0 + 1)) {
       ::ResetEvent(in_queue_);
-      TaskQueue::Impl::Current()->RunPendingTasks();
+      RunPendingTasks();
     }
   }
 }
 
-bool TaskQueue::Impl::ThreadState::ProcessQueuedMessages() {
+bool TaskQueueWin::ProcessQueuedMessages() {
   MSG msg = {};
   // To protect against overly busy message queues, we limit the time
   // we process tasks to a few milliseconds. If we don't do that, there's
@@ -425,7 +361,7 @@
   return msg.message != WM_QUIT;
 }
 
-void TaskQueue::Impl::ThreadState::RunDueTasks() {
+void TaskQueueWin::RunDueTasks() {
   RTC_DCHECK(!timer_tasks_.empty());
   auto now = GetTick();
   do {
@@ -437,7 +373,7 @@
   } while (!timer_tasks_.empty());
 }
 
-void TaskQueue::Impl::ThreadState::ScheduleNextTimer() {
+void TaskQueueWin::ScheduleNextTimer() {
   RTC_DCHECK_EQ(timer_id_, 0);
   if (timer_tasks_.empty())
     return;
@@ -449,7 +385,7 @@
     timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
 }
 
-void TaskQueue::Impl::ThreadState::CancelTimers() {
+void TaskQueueWin::CancelTimers() {
   timer_.Cancel();
   if (timer_id_) {
     ::KillTimer(nullptr, timer_id_);
@@ -457,30 +393,20 @@
   }
 }
 
-// Boilerplate for the PIMPL pattern.
-TaskQueue::TaskQueue(const char* queue_name, Priority priority)
-    : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+class TaskQueueWinFactory : public TaskQueueFactory {
+ public:
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+      absl::string_view name,
+      Priority priority) const override {
+    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+        new TaskQueueWin(name, TaskQueuePriorityToThreadPriority(priority)));
+  }
+};
+
+}  // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueWinFactory() {
+  return absl::make_unique<TaskQueueWinFactory>();
 }
 
-TaskQueue::~TaskQueue() {}
-
-// static
-TaskQueue* TaskQueue::Current() {
-  return TaskQueue::Impl::CurrentQueue();
-}
-
-// Used for DCHECKing the current queue.
-bool TaskQueue::IsCurrent() const {
-  return impl_->IsCurrent();
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
-  return TaskQueue::impl_->PostTask(std::move(task));
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                uint32_t milliseconds) {
-  return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
-}
-
-}  // namespace rtc
+}  // namespace webrtc