This cl refactor TaskQueues to use a PIMPL implementation on linux/Android.

In later steps the Win/Mac implementation will also be refactored.

The rtc_task_queue target is split up in three separate targets:

rtc_task_queue_api:
Contains the header file task_queue.h but no implementation.
Only external TaskQueue implementations should directly depend on this target.

rtc_task_queue_impl:
Contains the default implementation of task_queue.h.
Only external application targets should directly depend on this target.

rtc_task_queue:
WebRTC targets should depend on this target. It unconditionally depend on rtc_task_queue_api and depending on the new build flag,|rtc_link_task_queue_impl|,  depend on rtc_task_queue_impl.

BUG=webrtc:8160

Review-Url: https://codereview.webrtc.org/3003643002
Cr-Commit-Position: refs/heads/master@{#19516}
diff --git a/webrtc/rtc_base/task_queue_libevent.cc b/webrtc/rtc_base/task_queue_libevent.cc
index db267dc..99b88df 100644
--- a/webrtc/rtc_base/task_queue_libevent.cc
+++ b/webrtc/rtc_base/task_queue_libevent.cc
@@ -18,7 +18,11 @@
 #include "base/third_party/libevent/event.h"
 #include "webrtc/rtc_base/checks.h"
 #include "webrtc/rtc_base/logging.h"
+#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/refcount.h"
+#include "webrtc/rtc_base/refcountedobject.h"
 #include "webrtc/rtc_base/safe_conversions.h"
+#include "webrtc/rtc_base/task_queue.h"
 #include "webrtc/rtc_base/task_queue_posix.h"
 #include "webrtc/rtc_base/timeutils.h"
 
@@ -104,9 +108,57 @@
 }
 }  // namespace
 
-struct TaskQueue::QueueContext {
-  explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
-  TaskQueue* queue;
+class TaskQueue::Impl : public RefCountInterface {
+ public:
+  explicit Impl(const char* queue_name,
+                TaskQueue* queue,
+                Priority priority = Priority::NORMAL);
+  ~Impl() override;
+
+  static TaskQueue::Impl* Current();
+  static TaskQueue* CurrentQueue();
+
+  // Used for DCHECKing the current queue.
+  static bool IsCurrent(const char* queue_name);
+  bool IsCurrent() const;
+
+  void PostTask(std::unique_ptr<QueuedTask> task);
+  void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                        std::unique_ptr<QueuedTask> reply,
+                        TaskQueue::Impl* reply_queue);
+
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+
+ private:
+  static void ThreadMain(void* context);
+  static void OnWakeup(int socket, short flags, void* context);  // NOLINT
+  static void RunTask(int fd, short flags, void* context);       // NOLINT
+  static void RunTimer(int fd, short flags, void* context);      // NOLINT
+
+  class ReplyTaskOwner;
+  class PostAndReplyTask;
+  class SetTimerTask;
+
+  typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
+
+  void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
+
+  struct QueueContext;
+  TaskQueue* const queue_;
+  int wakeup_pipe_in_ = -1;
+  int wakeup_pipe_out_ = -1;
+  event_base* event_base_;
+  std::unique_ptr<event> wakeup_event_;
+  PlatformThread thread_;
+  rtc::CriticalSection pending_lock_;
+  std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
+  std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
+      GUARDED_BY(pending_lock_);
+};
+
+struct TaskQueue::Impl::QueueContext {
+  explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
+  TaskQueue::Impl* queue;
   bool is_active;
   // Holds a list of events pending timers for cleanup when the loop exits.
   std::list<TimerEvent*> pending_timers_;
@@ -135,7 +187,7 @@
 //     * if set_should_run_task() was called, the reply task will be run
 //   * Release the reference to ReplyTaskOwner
 //   * ReplyTaskOwner and associated |reply_| are deleted.
-class TaskQueue::ReplyTaskOwner {
+class TaskQueue::Impl::ReplyTaskOwner {
  public:
   ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
       : reply_(std::move(reply)) {}
@@ -159,11 +211,11 @@
   bool run_task_ = false;
 };
 
-class TaskQueue::PostAndReplyTask : public QueuedTask {
+class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
  public:
   PostAndReplyTask(std::unique_ptr<QueuedTask> task,
                    std::unique_ptr<QueuedTask> reply,
-                   TaskQueue* reply_queue,
+                   TaskQueue::Impl* reply_queue,
                    int reply_pipe)
       : task_(std::move(task)),
         reply_pipe_(reply_pipe),
@@ -196,7 +248,7 @@
   scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
 };
 
-class TaskQueue::SetTimerTask : public QueuedTask {
+class TaskQueue::Impl::SetTimerTask : public QueuedTask {
  public:
   SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
       : task_(std::move(task)),
@@ -208,7 +260,7 @@
     // Compensate for the time that has passed since construction
     // and until we got here.
     uint32_t post_time = Time32() - posted_;
-    TaskQueue::Current()->PostDelayedTask(
+    TaskQueue::Impl::Current()->PostDelayedTask(
         std::move(task_),
         post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
     return true;
@@ -219,10 +271,13 @@
   const uint32_t posted_;
 };
 
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
-    : event_base_(event_base_new()),
+TaskQueue::Impl::Impl(const char* queue_name,
+                      TaskQueue* queue,
+                      Priority priority /*= NORMAL*/)
+    : queue_(queue),
+      event_base_(event_base_new()),
       wakeup_event_(new event()),
-      thread_(&TaskQueue::ThreadMain,
+      thread_(&TaskQueue::Impl::ThreadMain,
               this,
               queue_name,
               TaskQueuePriorityToThreadPriority(priority)) {
@@ -240,7 +295,7 @@
   thread_.Start();
 }
 
-TaskQueue::~TaskQueue() {
+TaskQueue::Impl::~Impl() {
   RTC_DCHECK(!IsCurrent());
   struct timespec ts;
   char message = kQuit;
@@ -267,29 +322,38 @@
 }
 
 // static
-TaskQueue* TaskQueue::Current() {
+TaskQueue::Impl* TaskQueue::Impl::Current() {
   QueueContext* ctx =
       static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
   return ctx ? ctx->queue : nullptr;
 }
 
 // static
-bool TaskQueue::IsCurrent(const char* queue_name) {
-  TaskQueue* current = Current();
+TaskQueue* TaskQueue::Impl::CurrentQueue() {
+  TaskQueue::Impl* current = Current();
+  if (current) {
+    return current->queue_;
+  }
+  return nullptr;
+}
+
+// static
+bool TaskQueue::Impl::IsCurrent(const char* queue_name) {
+  TaskQueue::Impl* current = Current();
   return current && current->thread_.name().compare(queue_name) == 0;
 }
 
-bool TaskQueue::IsCurrent() const {
+bool TaskQueue::Impl::IsCurrent() const {
   return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
 }
 
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
   RTC_DCHECK(task.get());
   // libevent isn't thread safe.  This means that we can't use methods such
   // as event_base_once to post tasks to the worker thread from a different
   // thread.  However, we can use it when posting from the worker thread itself.
   if (IsCurrent()) {
-    if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
+    if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
                         task.get(), nullptr) == 0) {
       task.release();
     }
@@ -310,11 +374,12 @@
   }
 }
 
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                uint32_t milliseconds) {
+void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                      uint32_t milliseconds) {
   if (IsCurrent()) {
     TimerEvent* timer = new TimerEvent(std::move(task));
-    EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
+    EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
+                timer);
     QueueContext* ctx =
         static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
     ctx->pending_timers_.push_back(timer);
@@ -327,23 +392,18 @@
   }
 }
 
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                                 std::unique_ptr<QueuedTask> reply,
-                                 TaskQueue* reply_queue) {
+void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                       std::unique_ptr<QueuedTask> reply,
+                                       TaskQueue::Impl* reply_queue) {
   std::unique_ptr<QueuedTask> wrapper_task(
       new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
                            reply_queue->wakeup_pipe_in_));
   PostTask(std::move(wrapper_task));
 }
 
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
-                                 std::unique_ptr<QueuedTask> reply) {
-  return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
 // static
-void TaskQueue::ThreadMain(void* context) {
-  TaskQueue* me = static_cast<TaskQueue*>(context);
+void TaskQueue::Impl::ThreadMain(void* context) {
+  TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
 
   QueueContext queue_context(me);
   pthread_setspecific(GetQueuePtrTls(), &queue_context);
@@ -358,7 +418,9 @@
 }
 
 // static
-void TaskQueue::OnWakeup(int socket, short flags, void* context) {  // NOLINT
+void TaskQueue::Impl::OnWakeup(int socket,
+                               short flags,
+                               void* context) {  // NOLINT
   QueueContext* ctx =
       static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
   RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
@@ -405,14 +467,14 @@
 }
 
 // static
-void TaskQueue::RunTask(int fd, short flags, void* context) {  // NOLINT
+void TaskQueue::Impl::RunTask(int fd, short flags, void* context) {  // NOLINT
   auto* task = static_cast<QueuedTask*>(context);
   if (task->Run())
     delete task;
 }
 
 // static
-void TaskQueue::RunTimer(int fd, short flags, void* context) {  // NOLINT
+void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) {  // NOLINT
   TimerEvent* timer = static_cast<TimerEvent*>(context);
   if (!timer->task->Run())
     timer->task.release();
@@ -422,10 +484,54 @@
   delete timer;
 }
 
-void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
+void TaskQueue::Impl::PrepareReplyTask(
+    scoped_refptr<ReplyTaskOwnerRef> reply_task) {
   RTC_DCHECK(reply_task);
   CritScope lock(&pending_lock_);
   pending_replies_.push_back(std::move(reply_task));
 }
 
+TaskQueue::TaskQueue(const char* queue_name, Priority priority)
+    : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+}
+
+TaskQueue::~TaskQueue() {}
+
+// static
+TaskQueue* TaskQueue::Current() {
+  return TaskQueue::Impl::CurrentQueue();
+}
+
+// Used for DCHECKing the current queue.
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+  return TaskQueue::Impl::IsCurrent(queue_name);
+}
+
+bool TaskQueue::IsCurrent() const {
+  return impl_->IsCurrent();
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+  return TaskQueue::impl_->PostTask(std::move(task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply,
+                                 TaskQueue* reply_queue) {
+  return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
+                                            reply_queue->impl_.get());
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply) {
+  return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
+                                            impl_.get());
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                uint32_t milliseconds) {
+  return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
+}
+
 }  // namespace rtc