In rtc::Thread hide MessageHandler handling as implementation details

Remote Peek function as unused
Move Get and Dispatch into private section to ensure they are not used
from outside.

Bug: webrtc:9702
Change-Id: Ibd0b236fe43543d60f97f988524526493bbeaaa7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272804
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37889}
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 56b6b43..9946989 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -380,8 +380,7 @@
     : Thread(std::move(ss), /*do_init=*/true) {}
 
 Thread::Thread(SocketServer* ss, bool do_init)
-    : fPeekKeep_(false),
-      delayed_next_num_(0),
+    : delayed_next_num_(0),
       fInitialized_(false),
       fDestroyed_(false),
       stop_(0),
@@ -450,28 +449,7 @@
   stop_.store(0, std::memory_order_release);
 }
 
-bool Thread::Peek(Message* pmsg, int cmsWait) {
-  if (fPeekKeep_) {
-    *pmsg = msgPeek_;
-    return true;
-  }
-  if (!Get(pmsg, cmsWait))
-    return false;
-  msgPeek_ = *pmsg;
-  fPeekKeep_ = true;
-  return true;
-}
-
 bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
-  // Return and clear peek if present
-  // Always return the peek if it exists so there is Peek/Get symmetry
-
-  if (fPeekKeep_) {
-    *pmsg = msgPeek_;
-    fPeekKeep_ = false;
-    return true;
-  }
-
   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
 
   int64_t cmsTotal = cmsWait;
@@ -650,17 +628,6 @@
 void Thread::ClearInternal(MessageHandler* phandler,
                            uint32_t id,
                            MessageList* removed) {
-  // Remove messages with phandler
-
-  if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
-    if (removed) {
-      removed->push_back(msgPeek_);
-    } else {
-      delete msgPeek_.pdata;
-    }
-    fPeekKeep_ = false;
-  }
-
   // Remove from ordered message queue
 
   for (auto it = messages_.begin(); it != messages_.end();) {
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 77ccc9e..ef43e51 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -268,14 +268,6 @@
   // Processed.  Normally, this would be true until IsQuitting() is true.
   virtual bool IsProcessingMessagesForTesting();
 
-  // Get() will process I/O until:
-  //  1) A message is available (returns true)
-  //  2) cmsWait seconds have elapsed (returns false)
-  //  3) Stop() is called (returns false)
-  virtual bool Get(Message* pmsg,
-                   int cmsWait = kForever,
-                   bool process_io = true);
-  virtual bool Peek(Message* pmsg, int cmsWait = 0);
   // `time_sensitive` is deprecated and should always be false.
   virtual void Post(const Location& posted_from,
                     MessageHandler* phandler,
@@ -295,7 +287,6 @@
   virtual void Clear(MessageHandler* phandler,
                      uint32_t id = MQID_ANY,
                      MessageList* removed = nullptr);
-  virtual void Dispatch(Message* pmsg);
 
   // Amount of time until the next message can be retrieved
   virtual int GetDelay();
@@ -303,7 +294,7 @@
   bool empty() const { return size() == 0u; }
   size_t size() const {
     CritScope cs(&crit_);
-    return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
+    return messages_.size() + delayed_messages_.size();
   }
 
   // Internally posts a message which causes the doomed object to be deleted
@@ -522,6 +513,21 @@
  private:
   static const int kSlowDispatchLoggingThreshold = 50;  // 50 ms
 
+  // TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it.
+  // chromium's ThreadWrapper overrides it just to check it is never called.
+  virtual bool Peek(Message* pmsg, int cms_wait) {
+    RTC_DCHECK_NOTREACHED();
+    return false;
+  }
+  // Get() will process I/O until:
+  //  1) A message is available (returns true)
+  //  2) cmsWait seconds have elapsed (returns false)
+  //  3) Stop() is called (returns false)
+  virtual bool Get(Message* pmsg,
+                   int cmsWait = kForever,
+                   bool process_io = true);
+  virtual void Dispatch(Message* pmsg);
+
   // Sets the per-thread allow-blocking-calls flag and returns the previous
   // value. Must be called on this thread.
   bool SetAllowBlockingCalls(bool allow);
@@ -552,8 +558,6 @@
   // Called by the ThreadManager when being unset as the current thread.
   void ClearCurrentTaskQueue();
 
-  bool fPeekKeep_;
-  Message msgPeek_;
   MessageList messages_ RTC_GUARDED_BY(crit_);
   PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
   uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index b06b09e..68416f5 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -20,6 +20,7 @@
 #include "rtc_base/async_udp_socket.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/event.h"
+#include "rtc_base/fake_clock.h"
 #include "rtc_base/gunit.h"
 #include "rtc_base/internal/default_socket_server.h"
 #include "rtc_base/null_socket_server.h"
@@ -27,6 +28,7 @@
 #include "rtc_base/socket_address.h"
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
+#include "test/gmock.h"
 #include "test/testsupport/rtc_expect_death.h"
 
 #if defined(WEBRTC_WIN)
@@ -37,6 +39,7 @@
 namespace rtc {
 namespace {
 
+using ::testing::ElementsAre;
 using ::webrtc::TimeDelta;
 
 // Generates a sequence of numbers (collaboratively).
@@ -585,32 +588,39 @@
   bool* deleted;
 };
 
-static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
-  EXPECT_TRUE(q != nullptr);
+static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
+    FakeClock& clock,
+    Thread& q) {
+  std::vector<int> run_order;
+
+  Event done;
   int64_t now = TimeMillis();
-  q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
-  q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
-  q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
-  q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
-  q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
+  q.PostDelayedTask([&] { run_order.push_back(3); }, TimeDelta::Millis(3));
+  q.PostDelayedTask([&] { run_order.push_back(0); }, TimeDelta::Millis(1));
+  q.PostDelayedTask([&] { run_order.push_back(1); }, TimeDelta::Millis(2));
+  q.PostDelayedTask([&] { run_order.push_back(4); }, TimeDelta::Millis(3));
+  q.PostDelayedTask([&] { run_order.push_back(2); }, TimeDelta::Millis(2));
+  q.PostDelayedTask([&] { done.Set(); }, TimeDelta::Millis(4));
+  // Validate time was frozen while tasks were posted.
+  RTC_DCHECK_EQ(TimeMillis(), now);
 
-  Message msg;
-  for (size_t i = 0; i < 5; ++i) {
-    memset(&msg, 0, sizeof(msg));
-    EXPECT_TRUE(q->Get(&msg, 0));
-    EXPECT_EQ(i, msg.message_id);
-  }
+  // Change time to make all tasks ready to run and wait for them.
+  clock.AdvanceTime(TimeDelta::Millis(4));
+  ASSERT_TRUE(done.Wait(TimeDelta::Seconds(1)));
 
-  EXPECT_FALSE(q->Get(&msg, 0));  // No more messages
+  EXPECT_THAT(run_order, ElementsAre(0, 1, 2, 3, 4));
 }
 
 TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
+  ScopedBaseFakeClock clock;
   Thread q(CreateDefaultSocketServer(), true);
-  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
+  q.Start();
+  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q);
 
   NullSocketServer nullss;
   Thread q_nullss(&nullss, true);
-  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
+  q_nullss.Start();
+  DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q_nullss);
 }
 
 TEST_F(ThreadQueueTest, DisposeNotLocked) {
@@ -619,7 +629,7 @@
   DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
   Dispose(d);
   Message msg;
-  EXPECT_FALSE(Get(&msg, 0));
+  ProcessMessages(0);
   EXPECT_TRUE(deleted);
   EXPECT_FALSE(was_locked);
 }
@@ -642,7 +652,7 @@
   // Now, post a message, which should *not* be returned by Get().
   Post(RTC_FROM_HERE, handler, 1);
   Message msg;
-  EXPECT_FALSE(Get(&msg, 0));
+  ProcessMessages(0);
   EXPECT_TRUE(deleted);
 }
 
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc
index 91b486b..a055e59 100644
--- a/rtc_base/virtual_socket_server.cc
+++ b/rtc_base/virtual_socket_server.cc
@@ -647,10 +647,7 @@
       fake_clock_->AdvanceTime(webrtc::TimeDelta::Millis(1));
     } else {
       // Otherwise, run a normal message loop.
-      Message msg;
-      if (msg_queue_->Get(&msg, Thread::kForever)) {
-        msg_queue_->Dispatch(&msg);
-      }
+      msg_queue_->ProcessMessages(Thread::kForever);
     }
   }
   stop_on_idle_ = false;