Fixing a couple cases that cause ProcessAllMessageQueues to hang.

The two situations are:
1. A thread is in the process of shutting down, so it won't handle any
   more messages.
2. A message queue is cleared before it has a chance to process pending
   messages.

In both of those cases, we should consider processing done at that
point.

R=honghaiz@webrtc.org, pthatcher@webrtc.org

Review URL: https://codereview.webrtc.org/2319303004 .

Cr-Commit-Position: refs/heads/master@{#14245}
diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc
index ebf98f5..503d5af 100644
--- a/webrtc/base/messagequeue.cc
+++ b/webrtc/base/messagequeue.cc
@@ -126,17 +126,34 @@
 }
 
 void MessageQueueManager::ProcessAllMessageQueuesInternal() {
-  // Post a delayed message at the current time and wait for it to be dispatched
-  // on all queues, which will ensure that all messages that came before it were
-  // also dispatched.
-  volatile int queues_not_done;
-  auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
-  FunctorMessageHandler<void, decltype(functor)> handler(functor);
+  // This works by posting a delayed message at the current time and waiting
+  // for it to be dispatched on all queues, which will ensure that all messages
+  // that came before it were also dispatched.
+  volatile int queues_not_done = 0;
+
+  // This class is used so that whether the posted message is processed, or the
+  // message queue is simply cleared, queues_not_done gets decremented.
+  class ScopedIncrement : public MessageData {
+   public:
+    ScopedIncrement(volatile int* value) : value_(value) {
+      AtomicOps::Increment(value_);
+    }
+    ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
+
+   private:
+    volatile int* value_;
+  };
+
   {
     DebugNonReentrantCritScope cs(&crit_, &locked_);
-    queues_not_done = static_cast<int>(message_queues_.size());
     for (MessageQueue* queue : message_queues_) {
-      queue->PostDelayed(RTC_FROM_HERE, 0, &handler);
+      if (queue->IsQuitting()) {
+        // If the queue is quitting, it's done processing messages so it can
+        // be ignored. If we tried to post a message to it, it would be dropped.
+        continue;
+      }
+      queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
+                         new ScopedIncrement(&queues_not_done));
     }
   }
   // Note: One of the message queues may have been on this thread, which is why
diff --git a/webrtc/base/messagequeue_unittest.cc b/webrtc/base/messagequeue_unittest.cc
index fc3a8f7..8834ab5 100644
--- a/webrtc/base/messagequeue_unittest.cc
+++ b/webrtc/base/messagequeue_unittest.cc
@@ -10,7 +10,11 @@
 
 #include "webrtc/base/messagequeue.h"
 
+#include <functional>
+
+#include "webrtc/base/atomicops.h"
 #include "webrtc/base/bind.h"
+#include "webrtc/base/event.h"
 #include "webrtc/base/gunit.h"
 #include "webrtc/base/logging.h"
 #include "webrtc/base/thread.h"
@@ -140,3 +144,74 @@
   EXPECT_TRUE(deleted);
   EXPECT_FALSE(MessageQueueManager::IsInitialized());
 }
+
+// Ensure that ProcessAllMessageQueues does its essential function; process
+// all messages (both delayed and non delayed) up until the current time, on
+// all registered message queues.
+TEST(MessageQueueManager, ProcessAllMessageQueues) {
+  Event entered_process_all_message_queues(true, false);
+  Thread a;
+  Thread b;
+  a.Start();
+  b.Start();
+
+  volatile int messages_processed = 0;
+  FunctorMessageHandler<void, std::function<void()>> incrementer(
+      [&messages_processed, &entered_process_all_message_queues] {
+        // Wait for event as a means to ensure Increment doesn't occur outside
+        // of ProcessAllMessageQueues. The event is set by a message posted to
+        // the main thread, which is guaranteed to be handled inside
+        // ProcessAllMessageQueues.
+        entered_process_all_message_queues.Wait(Event::kForever);
+        AtomicOps::Increment(&messages_processed);
+      });
+  FunctorMessageHandler<void, std::function<void()>> event_signaler(
+      [&entered_process_all_message_queues] {
+        entered_process_all_message_queues.Set();
+      });
+
+  // Post messages (both delayed and non delayed) to both threads.
+  a.Post(RTC_FROM_HERE, &incrementer);
+  b.Post(RTC_FROM_HERE, &incrementer);
+  a.PostDelayed(RTC_FROM_HERE, 0, &incrementer);
+  b.PostDelayed(RTC_FROM_HERE, 0, &incrementer);
+  rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
+
+  MessageQueueManager::ProcessAllMessageQueues();
+  EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed));
+}
+
+// Test that ProcessAllMessageQueues doesn't hang if a thread is quitting.
+TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) {
+  Thread t;
+  t.Start();
+  t.Quit();
+  MessageQueueManager::ProcessAllMessageQueues();
+}
+
+// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
+// messages.
+TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) {
+  Event entered_process_all_message_queues(true, false);
+  Thread t;
+  t.Start();
+
+  FunctorMessageHandler<void, std::function<void()>> clearer(
+      [&entered_process_all_message_queues] {
+        // Wait for event as a means to ensure Clear doesn't occur outside of
+        // ProcessAllMessageQueues. The event is set by a message posted to the
+        // main thread, which is guaranteed to be handled inside
+        // ProcessAllMessageQueues.
+        entered_process_all_message_queues.Wait(Event::kForever);
+        rtc::Thread::Current()->Clear(nullptr);
+      });
+  FunctorMessageHandler<void, std::function<void()>> event_signaler(
+      [&entered_process_all_message_queues] {
+        entered_process_all_message_queues.Set();
+      });
+
+  // Post messages (both delayed and non delayed) to both threads.
+  t.Post(RTC_FROM_HERE, &clearer);
+  rtc::Thread::Current()->Post(RTC_FROM_HERE, &event_signaler);
+  MessageQueueManager::ProcessAllMessageQueues();
+}