Fix potential deadlock in TaskQueue's libevent PostTaskAndReply implementation

BUG=webrtc:7188

Review-Url: https://codereview.webrtc.org/2709603002
Cr-Commit-Position: refs/heads/master@{#16786}
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
index c802588..c6ce5b3 100644
--- a/webrtc/base/task_queue_libevent.cc
+++ b/webrtc/base/task_queue_libevent.cc
@@ -11,6 +11,7 @@
 #include "webrtc/base/task_queue.h"
 
 #include <fcntl.h>
+#include <signal.h>
 #include <string.h>
 #include <unistd.h>
 
@@ -27,6 +28,28 @@
 namespace {
 static const char kQuit = 1;
 static const char kRunTask = 2;
+static const char kRunReplyTask = 3;
+
+// This ignores the SIGPIPE signal on the calling thread.
+// This signal can be fired when trying to write() to a pipe that's being
+// closed or while closing a pipe that's being written to.
+// We can run into that situation (e.g. reply tasks that don't get a chance to
+// run because the task queue is being deleted) so we ignore this signal and
+// continue as normal.
+// As a side note for this implementation, it would be great if we could safely
+// restore the sigmask, but unfortunately the operation of restoring it, can
+// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
+// The SIGPIPE signal by default causes the process to be terminated, so we
+// don't want to risk that.
+// An alternative to this approach is to ignore the signal for the whole
+// process:
+//   signal(SIGPIPE, SIG_IGN);
+void IgnoreSigPipeSignalOnCurrentThread() {
+  sigset_t sigpipe_mask;
+  sigemptyset(&sigpipe_mask);
+  sigaddset(&sigpipe_mask, SIGPIPE);
+  pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
+}
 
 struct TimerEvent {
   explicit TimerEvent(std::unique_ptr<QueuedTask> task)
@@ -71,43 +94,88 @@
   std::list<TimerEvent*> pending_timers_;
 };
 
+// Posting a reply task is tricky business. This class owns the reply task
+// and a reference to it is held by both the reply queue and the first task.
+// Here's an outline of what happens when dealing with a reply task.
+// * The ReplyTaskOwner owns the |reply_| task.
+// * One ref owned by PostAndReplyTask
+// * One ref owned by the reply TaskQueue
+// * ReplyTaskOwner has a flag |run_task_| initially set to false.
+// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
+// * After successfully running the original |task_|, PostAndReplyTask() calls
+//   set_should_run_task(). This sets |run_task_| to true.
+// * In PostAndReplyTask's dtor:
+//   * It releases its reference to ReplyTaskOwner (important to do this first).
+//   * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
+// * PostAndReplyTask doesn't care if write() fails, but when it does:
+//   * The reply queue is gone.
+//   * ReplyTaskOwner has already been deleted and the reply task too.
+// * If write() succeeds:
+//   * ReplyQueue receives the kRunReplyTask message
+//   * Goes through all pending tasks, finding the first that HasOneRef()
+//   * Calls ReplyTaskOwner::Run()
+//     * if set_should_run_task() was called, the reply task will be run
+//   * Release the reference to ReplyTaskOwner
+//   * ReplyTaskOwner and associated |reply_| are deleted.
+class TaskQueue::ReplyTaskOwner {
+ public:
+  ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
+      : reply_(std::move(reply)) {}
+
+  void Run() {
+    RTC_DCHECK(reply_);
+    if (run_task_) {
+      if (!reply_->Run())
+        reply_.release();
+    }
+    reply_.reset();
+  }
+
+  void set_should_run_task() {
+    RTC_DCHECK(!run_task_);
+    run_task_ = true;
+  }
+
+ private:
+  std::unique_ptr<QueuedTask> reply_;
+  bool run_task_ = false;
+};
+
 class TaskQueue::PostAndReplyTask : public QueuedTask {
  public:
   PostAndReplyTask(std::unique_ptr<QueuedTask> task,
                    std::unique_ptr<QueuedTask> reply,
-                   TaskQueue* reply_queue)
+                   TaskQueue* reply_queue,
+                   int reply_pipe)
       : task_(std::move(task)),
-        reply_(std::move(reply)),
-        reply_queue_(reply_queue) {
-    reply_queue->PrepareReplyTask(this);
+        reply_pipe_(reply_pipe),
+        reply_task_owner_(
+            new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
+    reply_queue->PrepareReplyTask(reply_task_owner_);
   }
 
   ~PostAndReplyTask() override {
-    CritScope lock(&lock_);
-    if (reply_queue_)
-      reply_queue_->ReplyTaskDone(this);
-  }
-
-  void OnReplyQueueGone() {
-    CritScope lock(&lock_);
-    reply_queue_ = nullptr;
+    reply_task_owner_ = nullptr;
+    IgnoreSigPipeSignalOnCurrentThread();
+    // Send a signal to the reply queue that the reply task can run now.
+    // Depending on whether |set_should_run_task()| was called by the
+    // PostAndReplyTask(), the reply task may or may not actually run.
+    // In either case, it will be deleted.
+    char message = kRunReplyTask;
+    write(reply_pipe_, &message, sizeof(message));
   }
 
  private:
   bool Run() override {
     if (!task_->Run())
       task_.release();
-
-    CritScope lock(&lock_);
-    if (reply_queue_)
-      reply_queue_->PostTask(std::move(reply_));
+    reply_task_owner_->set_should_run_task();
     return true;
   }
 
-  CriticalSection lock_;
   std::unique_ptr<QueuedTask> task_;
-  std::unique_ptr<QueuedTask> reply_;
-  TaskQueue* reply_queue_ GUARDED_BY(lock_);
+  int reply_pipe_;
+  scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
 };
 
 class TaskQueue::SetTimerTask : public QueuedTask {
@@ -144,6 +212,7 @@
   SetNonBlocking(fds[1]);
   wakeup_pipe_out_ = fds[0];
   wakeup_pipe_in_ = fds[1];
+
   EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
               EV_READ | EV_PERSIST, OnWakeup, this);
   event_add(wakeup_event_.get(), 0);
@@ -165,20 +234,14 @@
   thread_.Stop();
 
   event_del(wakeup_event_.get());
+
+  IgnoreSigPipeSignalOnCurrentThread();
+
   close(wakeup_pipe_in_);
   close(wakeup_pipe_out_);
   wakeup_pipe_in_ = -1;
   wakeup_pipe_out_ = -1;
 
-  {
-    // Synchronize against any pending reply tasks that might be running on
-    // other queues.
-    CritScope lock(&pending_lock_);
-    for (auto* reply : pending_replies_)
-      reply->OnReplyQueueGone();
-    pending_replies_.clear();
-  }
-
   event_base_free(event_base_);
 }
 
@@ -246,7 +309,8 @@
                                  std::unique_ptr<QueuedTask> reply,
                                  TaskQueue* reply_queue) {
   std::unique_ptr<QueuedTask> wrapper_task(
-      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
+                           reply_queue->wakeup_pipe_in_));
   PostTask(std::move(wrapper_task));
 }
 
@@ -296,6 +360,22 @@
         task.release();
       break;
     }
+    case kRunReplyTask: {
+      scoped_refptr<ReplyTaskOwnerRef> reply_task;
+      {
+        CritScope lock(&ctx->queue->pending_lock_);
+        for (auto it = ctx->queue->pending_replies_.begin();
+             it != ctx->queue->pending_replies_.end(); ++it) {
+          if ((*it)->HasOneRef()) {
+            reply_task = std::move(*it);
+            ctx->queue->pending_replies_.erase(it);
+            break;
+          }
+        }
+      }
+      reply_task->Run();
+      break;
+    }
     default:
       RTC_NOTREACHED();
       break;
@@ -320,15 +400,10 @@
   delete timer;
 }
 
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
   RTC_DCHECK(reply_task);
   CritScope lock(&pending_lock_);
-  pending_replies_.push_back(reply_task);
-}
-
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
-  CritScope lock(&pending_lock_);
-  pending_replies_.remove(reply_task);
+  pending_replies_.push_back(std::move(reply_task));
 }
 
 }  // namespace rtc