Reland: Prevent data race in MessageQueue.

The CL prevents a data race in MessageQueue where the variable "ss_" is
modified without a lock while sometimes read inside a lock.

Also thread annotations have been added to the MessageQueue class.

This was already reviewed and landed in https://codereview.webrtc.org/1675923002/
but failed in Chromium GN builds due to sharedexclusivelock.cc not being
compiled in these builds. This changed in https://codereview.webrtc.org/1712773003/
so the reland should work fine now.

BUG=webrtc:5496

Review URL: https://codereview.webrtc.org/1729893002

Cr-Commit-Position: refs/heads/master@{#11758}
diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc
index bbdb941..61aa611 100644
--- a/webrtc/base/messagequeue.cc
+++ b/webrtc/base/messagequeue.cc
@@ -117,8 +117,8 @@
 // MessageQueue
 
 MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
-    : ss_(ss), fStop_(false), fPeekKeep_(false),
-      dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
+    : fStop_(false), fPeekKeep_(false),
+      dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
   if (!ss_) {
     // Currently, MessageQueue holds a socket server, and is the base class for
     // Thread.  It seems like it makes more sense for Thread to hold the socket
@@ -159,19 +159,37 @@
   SignalQueueDestroyed();
   MessageQueueManager::Remove(this);
   Clear(NULL);
+
+  SharedScope ss(&ss_lock_);
   if (ss_) {
     ss_->SetMessageQueue(NULL);
   }
 }
 
+SocketServer* MessageQueue::socketserver() {
+  SharedScope ss(&ss_lock_);
+  return ss_;
+}
+
 void MessageQueue::set_socketserver(SocketServer* ss) {
+  // Need to lock exclusively here to prevent simultaneous modifications from
+  // other threads. Can't be a shared lock to prevent races with other reading
+  // threads.
+  // Other places that only read "ss_" can use a shared lock as simultaneous
+  // read access is allowed.
+  ExclusiveScope es(&ss_lock_);
   ss_ = ss ? ss : default_ss_.get();
   ss_->SetMessageQueue(this);
 }
 
+void MessageQueue::WakeUpSocketServer() {
+  SharedScope ss(&ss_lock_);
+  ss_->WakeUp();
+}
+
 void MessageQueue::Quit() {
   fStop_ = true;
-  ss_->WakeUp();
+  WakeUpSocketServer();
 }
 
 bool MessageQueue::IsQuitting() {
@@ -277,9 +295,12 @@
         cmsNext = cmsDelayNext;
     }
 
-    // Wait and multiplex in the meantime
-    if (!ss_->Wait(cmsNext, process_io))
-      return false;
+    {
+      // Wait and multiplex in the meantime
+      SharedScope ss(&ss_lock_);
+      if (!ss_->Wait(cmsNext, process_io))
+        return false;
+    }
 
     // If the specified timeout expired, return
 
@@ -307,16 +328,18 @@
   // Add the message to the end of the queue
   // Signal for the multiplexer to return
 
-  CritScope cs(&crit_);
-  Message msg;
-  msg.phandler = phandler;
-  msg.message_id = id;
-  msg.pdata = pdata;
-  if (time_sensitive) {
-    msg.ts_sensitive = Time() + kMaxMsgLatency;
+  {
+    CritScope cs(&crit_);
+    Message msg;
+    msg.phandler = phandler;
+    msg.message_id = id;
+    msg.pdata = pdata;
+    if (time_sensitive) {
+      msg.ts_sensitive = Time() + kMaxMsgLatency;
+    }
+    msgq_.push_back(msg);
   }
-  msgq_.push_back(msg);
-  ss_->WakeUp();
+  WakeUpSocketServer();
 }
 
 void MessageQueue::PostDelayed(int cmsDelay,
@@ -345,18 +368,20 @@
   // Add to the priority queue. Gets sorted soonest first.
   // Signal for the multiplexer to return.
 
-  CritScope cs(&crit_);
-  Message msg;
-  msg.phandler = phandler;
-  msg.message_id = id;
-  msg.pdata = pdata;
-  DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
-  dmsgq_.push(dmsg);
-  // If this message queue processes 1 message every millisecond for 50 days,
-  // we will wrap this number.  Even then, only messages with identical times
-  // will be misordered, and then only briefly.  This is probably ok.
-  VERIFY(0 != ++dmsgq_next_num_);
-  ss_->WakeUp();
+  {
+    CritScope cs(&crit_);
+    Message msg;
+    msg.phandler = phandler;
+    msg.message_id = id;
+    msg.pdata = pdata;
+    DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
+    dmsgq_.push(dmsg);
+    // If this message queue processes 1 message every millisecond for 50 days,
+    // we will wrap this number.  Even then, only messages with identical times
+    // will be misordered, and then only briefly.  This is probably ok.
+    VERIFY(0 != ++dmsgq_next_num_);
+  }
+  WakeUpSocketServer();
 }
 
 int MessageQueue::GetDelay() {