In rtc::Thread hide MessageHandler handling as implementation details
Remote Peek function as unused
Move Get and Dispatch into private section to ensure they are not used
from outside.
Bug: webrtc:9702
Change-Id: Ibd0b236fe43543d60f97f988524526493bbeaaa7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272804
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37889}
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 56b6b43..9946989 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -380,8 +380,7 @@
: Thread(std::move(ss), /*do_init=*/true) {}
Thread::Thread(SocketServer* ss, bool do_init)
- : fPeekKeep_(false),
- delayed_next_num_(0),
+ : delayed_next_num_(0),
fInitialized_(false),
fDestroyed_(false),
stop_(0),
@@ -450,28 +449,7 @@
stop_.store(0, std::memory_order_release);
}
-bool Thread::Peek(Message* pmsg, int cmsWait) {
- if (fPeekKeep_) {
- *pmsg = msgPeek_;
- return true;
- }
- if (!Get(pmsg, cmsWait))
- return false;
- msgPeek_ = *pmsg;
- fPeekKeep_ = true;
- return true;
-}
-
bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
- // Return and clear peek if present
- // Always return the peek if it exists so there is Peek/Get symmetry
-
- if (fPeekKeep_) {
- *pmsg = msgPeek_;
- fPeekKeep_ = false;
- return true;
- }
-
// Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
int64_t cmsTotal = cmsWait;
@@ -650,17 +628,6 @@
void Thread::ClearInternal(MessageHandler* phandler,
uint32_t id,
MessageList* removed) {
- // Remove messages with phandler
-
- if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
- if (removed) {
- removed->push_back(msgPeek_);
- } else {
- delete msgPeek_.pdata;
- }
- fPeekKeep_ = false;
- }
-
// Remove from ordered message queue
for (auto it = messages_.begin(); it != messages_.end();) {
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 77ccc9e..ef43e51 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -268,14 +268,6 @@
// Processed. Normally, this would be true until IsQuitting() is true.
virtual bool IsProcessingMessagesForTesting();
- // Get() will process I/O until:
- // 1) A message is available (returns true)
- // 2) cmsWait seconds have elapsed (returns false)
- // 3) Stop() is called (returns false)
- virtual bool Get(Message* pmsg,
- int cmsWait = kForever,
- bool process_io = true);
- virtual bool Peek(Message* pmsg, int cmsWait = 0);
// `time_sensitive` is deprecated and should always be false.
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
@@ -295,7 +287,6 @@
virtual void Clear(MessageHandler* phandler,
uint32_t id = MQID_ANY,
MessageList* removed = nullptr);
- virtual void Dispatch(Message* pmsg);
// Amount of time until the next message can be retrieved
virtual int GetDelay();
@@ -303,7 +294,7 @@
bool empty() const { return size() == 0u; }
size_t size() const {
CritScope cs(&crit_);
- return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u);
+ return messages_.size() + delayed_messages_.size();
}
// Internally posts a message which causes the doomed object to be deleted
@@ -522,6 +513,21 @@
private:
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
+ // TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it.
+ // chromium's ThreadWrapper overrides it just to check it is never called.
+ virtual bool Peek(Message* pmsg, int cms_wait) {
+ RTC_DCHECK_NOTREACHED();
+ return false;
+ }
+ // Get() will process I/O until:
+ // 1) A message is available (returns true)
+ // 2) cmsWait seconds have elapsed (returns false)
+ // 3) Stop() is called (returns false)
+ virtual bool Get(Message* pmsg,
+ int cmsWait = kForever,
+ bool process_io = true);
+ virtual void Dispatch(Message* pmsg);
+
// Sets the per-thread allow-blocking-calls flag and returns the previous
// value. Must be called on this thread.
bool SetAllowBlockingCalls(bool allow);
@@ -552,8 +558,6 @@
// Called by the ThreadManager when being unset as the current thread.
void ClearCurrentTaskQueue();
- bool fPeekKeep_;
- Message msgPeek_;
MessageList messages_ RTC_GUARDED_BY(crit_);
PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index b06b09e..68416f5 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -20,6 +20,7 @@
#include "rtc_base/async_udp_socket.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
+#include "rtc_base/fake_clock.h"
#include "rtc_base/gunit.h"
#include "rtc_base/internal/default_socket_server.h"
#include "rtc_base/null_socket_server.h"
@@ -27,6 +28,7 @@
#include "rtc_base/socket_address.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
+#include "test/gmock.h"
#include "test/testsupport/rtc_expect_death.h"
#if defined(WEBRTC_WIN)
@@ -37,6 +39,7 @@
namespace rtc {
namespace {
+using ::testing::ElementsAre;
using ::webrtc::TimeDelta;
// Generates a sequence of numbers (collaboratively).
@@ -585,32 +588,39 @@
bool* deleted;
};
-static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) {
- EXPECT_TRUE(q != nullptr);
+static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(
+ FakeClock& clock,
+ Thread& q) {
+ std::vector<int> run_order;
+
+ Event done;
int64_t now = TimeMillis();
- q->PostAt(RTC_FROM_HERE, now, nullptr, 3);
- q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0);
- q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1);
- q->PostAt(RTC_FROM_HERE, now, nullptr, 4);
- q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2);
+ q.PostDelayedTask([&] { run_order.push_back(3); }, TimeDelta::Millis(3));
+ q.PostDelayedTask([&] { run_order.push_back(0); }, TimeDelta::Millis(1));
+ q.PostDelayedTask([&] { run_order.push_back(1); }, TimeDelta::Millis(2));
+ q.PostDelayedTask([&] { run_order.push_back(4); }, TimeDelta::Millis(3));
+ q.PostDelayedTask([&] { run_order.push_back(2); }, TimeDelta::Millis(2));
+ q.PostDelayedTask([&] { done.Set(); }, TimeDelta::Millis(4));
+ // Validate time was frozen while tasks were posted.
+ RTC_DCHECK_EQ(TimeMillis(), now);
- Message msg;
- for (size_t i = 0; i < 5; ++i) {
- memset(&msg, 0, sizeof(msg));
- EXPECT_TRUE(q->Get(&msg, 0));
- EXPECT_EQ(i, msg.message_id);
- }
+ // Change time to make all tasks ready to run and wait for them.
+ clock.AdvanceTime(TimeDelta::Millis(4));
+ ASSERT_TRUE(done.Wait(TimeDelta::Seconds(1)));
- EXPECT_FALSE(q->Get(&msg, 0)); // No more messages
+ EXPECT_THAT(run_order, ElementsAre(0, 1, 2, 3, 4));
}
TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) {
+ ScopedBaseFakeClock clock;
Thread q(CreateDefaultSocketServer(), true);
- DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q);
+ q.Start();
+ DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q);
NullSocketServer nullss;
Thread q_nullss(&nullss, true);
- DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss);
+ q_nullss.Start();
+ DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q_nullss);
}
TEST_F(ThreadQueueTest, DisposeNotLocked) {
@@ -619,7 +629,7 @@
DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted);
Dispose(d);
Message msg;
- EXPECT_FALSE(Get(&msg, 0));
+ ProcessMessages(0);
EXPECT_TRUE(deleted);
EXPECT_FALSE(was_locked);
}
@@ -642,7 +652,7 @@
// Now, post a message, which should *not* be returned by Get().
Post(RTC_FROM_HERE, handler, 1);
Message msg;
- EXPECT_FALSE(Get(&msg, 0));
+ ProcessMessages(0);
EXPECT_TRUE(deleted);
}
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc
index 91b486b..a055e59 100644
--- a/rtc_base/virtual_socket_server.cc
+++ b/rtc_base/virtual_socket_server.cc
@@ -647,10 +647,7 @@
fake_clock_->AdvanceTime(webrtc::TimeDelta::Millis(1));
} else {
// Otherwise, run a normal message loop.
- Message msg;
- if (msg_queue_->Get(&msg, Thread::kForever)) {
- msg_queue_->Dispatch(&msg);
- }
+ msg_queue_->ProcessMessages(Thread::kForever);
}
}
stop_on_idle_ = false;