Reland of New task queueing primitive for async tasks: TaskQueue.

New task queueing primitive for async tasks: TaskQueue.
TaskQueue is a new way to asynchronously execute tasks sequentially
in a thread safe manner with minimal locking.  The implementation
uses OS supported APIs to do this that are compatible with async IO
notifications from things like sockets and files.
This class is a part of rtc_base_approved, so can be used by both
the webrtc and libjingle parts of the WebRTC library.  Moving forward,
we can replace rtc::Thread and webrtc::ProcessThread with this implementation.
NOTE: It should not be assumed that all tasks that execute on a TaskQueue,
run on the same thread.  E.g. on Mac and iOS, we use GCD dispatch queues
which means that tasks might execute on different threads depending on
what's the most efficient thing to do.

TBR=perkj@webrtc.org,phoglund@webrtc.org

Review-Url: https://codereview.webrtc.org/1984503002
Cr-Commit-Position: refs/heads/master@{#12749}
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
new file mode 100644
index 0000000..5ae6d92
--- /dev/null
+++ b/webrtc/base/task_queue_win.cc
@@ -0,0 +1,184 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue.h"
+
+#include <string.h>
+#include <unordered_map>
+
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+
+namespace rtc {
+namespace {
+#define WM_RUN_TASK WM_USER + 1
+#define WM_QUEUE_DELAYED_TASK WM_USER + 2
+
+DWORD g_queue_ptr_tls = 0;
+
+BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
+  g_queue_ptr_tls = TlsAlloc();
+  return TRUE;
+}
+
+DWORD GetQueuePtrTls() {
+  static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
+  InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
+  return g_queue_ptr_tls;
+}
+
+struct ThreadStartupData {
+  Event* started;
+  void* thread_context;
+};
+
+void CALLBACK InitializeQueueThread(ULONG_PTR param) {
+  MSG msg;
+  PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
+  ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
+  TlsSetValue(GetQueuePtrTls(), data->thread_context);
+  data->started->Set();
+}
+}  // namespace
+
+TaskQueue::TaskQueue(const char* queue_name)
+    : thread_(&TaskQueue::ThreadMain, this, queue_name) {
+  RTC_DCHECK(queue_name);
+  thread_.Start();
+  Event event(false, false);
+  ThreadStartupData startup = {&event, this};
+  RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
+                             reinterpret_cast<ULONG_PTR>(&startup)));
+  event.Wait(Event::kForever);
+}
+
+TaskQueue::~TaskQueue() {
+  RTC_DCHECK(!IsCurrent());
+  while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
+    RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError());
+    Sleep(1);
+  }
+  thread_.Stop();
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+  return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+  TaskQueue* current = Current();
+  return current && current->thread_.name().compare(queue_name) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+  if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
+                        reinterpret_cast<LPARAM>(task.get()))) {
+    task.release();
+  }
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                uint32_t milliseconds) {
+  WPARAM wparam;
+#if defined(_WIN64)
+  // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
+  // so this compensation isn't that accurate, but since we have unused 32 bits
+  // on Win64, we might as well use them.
+  wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
+#else
+  wparam = milliseconds;
+#endif
+  if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
+                        reinterpret_cast<LPARAM>(task.get()))) {
+    task.release();
+  }
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply,
+                                 TaskQueue* reply_queue) {
+  QueuedTask* task_ptr = task.release();
+  QueuedTask* reply_task_ptr = reply.release();
+  DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
+  PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
+    if (task_ptr->Run())
+      delete task_ptr;
+    // If the thread's message queue is full, we can't queue the task and will
+    // have to drop it (i.e. delete).
+    if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
+                           reinterpret_cast<LPARAM>(reply_task_ptr))) {
+      delete reply_task_ptr;
+    }
+  });
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply) {
+  return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+// static
+bool TaskQueue::ThreadMain(void* context) {
+  std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> delayed_tasks;
+
+  BOOL ret;
+  MSG msg;
+
+  while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) {
+    if (!msg.hwnd) {
+      switch (msg.message) {
+        case WM_RUN_TASK: {
+          QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
+          if (task->Run())
+            delete task;
+          break;
+        }
+        case WM_QUEUE_DELAYED_TASK: {
+          QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
+          uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
+#if defined(_WIN64)
+          // Subtract the time it took to queue the timer.
+          const DWORD now = GetTickCount();
+          DWORD post_time = now - (msg.wParam >> 32);
+          milliseconds =
+              post_time > milliseconds ? 0 : milliseconds - post_time;
+#endif
+          UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
+          delayed_tasks.insert(std::make_pair(timer_id, task));
+          break;
+        }
+        case WM_TIMER: {
+          KillTimer(nullptr, msg.wParam);
+          auto found = delayed_tasks.find(msg.wParam);
+          RTC_DCHECK(found != delayed_tasks.end());
+          if (!found->second->Run())
+            found->second.release();
+          delayed_tasks.erase(found);
+          break;
+        }
+        default:
+          RTC_NOTREACHED();
+          break;
+      }
+    } else {
+      TranslateMessage(&msg);
+      DispatchMessage(&msg);
+    }
+  }
+
+  return false;
+}
+}  // namespace rtc