Reland: Prevent data race in MessageQueue.
The CL prevents a data race in MessageQueue where the variable "ss_" is
modified without a lock while sometimes read inside a lock.
Also thread annotations have been added to the MessageQueue class.
This was already reviewed and landed in https://codereview.webrtc.org/1675923002/
but failed in Chromium GN builds due to sharedexclusivelock.cc not being
compiled in these builds. This changed in https://codereview.webrtc.org/1712773003/
so the reland should work fine now.
BUG=webrtc:5496
Review URL: https://codereview.webrtc.org/1729893002
Cr-Commit-Position: refs/heads/master@{#11758}
diff --git a/webrtc/base/messagequeue.cc b/webrtc/base/messagequeue.cc
index bbdb941..61aa611 100644
--- a/webrtc/base/messagequeue.cc
+++ b/webrtc/base/messagequeue.cc
@@ -117,8 +117,8 @@
// MessageQueue
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
- : ss_(ss), fStop_(false), fPeekKeep_(false),
- dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
+ : fStop_(false), fPeekKeep_(false),
+ dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
if (!ss_) {
// Currently, MessageQueue holds a socket server, and is the base class for
// Thread. It seems like it makes more sense for Thread to hold the socket
@@ -159,19 +159,37 @@
SignalQueueDestroyed();
MessageQueueManager::Remove(this);
Clear(NULL);
+
+ SharedScope ss(&ss_lock_);
if (ss_) {
ss_->SetMessageQueue(NULL);
}
}
+SocketServer* MessageQueue::socketserver() {
+ SharedScope ss(&ss_lock_);
+ return ss_;
+}
+
void MessageQueue::set_socketserver(SocketServer* ss) {
+ // Need to lock exclusively here to prevent simultaneous modifications from
+ // other threads. Can't be a shared lock to prevent races with other reading
+ // threads.
+ // Other places that only read "ss_" can use a shared lock as simultaneous
+ // read access is allowed.
+ ExclusiveScope es(&ss_lock_);
ss_ = ss ? ss : default_ss_.get();
ss_->SetMessageQueue(this);
}
+void MessageQueue::WakeUpSocketServer() {
+ SharedScope ss(&ss_lock_);
+ ss_->WakeUp();
+}
+
void MessageQueue::Quit() {
fStop_ = true;
- ss_->WakeUp();
+ WakeUpSocketServer();
}
bool MessageQueue::IsQuitting() {
@@ -277,9 +295,12 @@
cmsNext = cmsDelayNext;
}
- // Wait and multiplex in the meantime
- if (!ss_->Wait(cmsNext, process_io))
- return false;
+ {
+ // Wait and multiplex in the meantime
+ SharedScope ss(&ss_lock_);
+ if (!ss_->Wait(cmsNext, process_io))
+ return false;
+ }
// If the specified timeout expired, return
@@ -307,16 +328,18 @@
// Add the message to the end of the queue
// Signal for the multiplexer to return
- CritScope cs(&crit_);
- Message msg;
- msg.phandler = phandler;
- msg.message_id = id;
- msg.pdata = pdata;
- if (time_sensitive) {
- msg.ts_sensitive = Time() + kMaxMsgLatency;
+ {
+ CritScope cs(&crit_);
+ Message msg;
+ msg.phandler = phandler;
+ msg.message_id = id;
+ msg.pdata = pdata;
+ if (time_sensitive) {
+ msg.ts_sensitive = Time() + kMaxMsgLatency;
+ }
+ msgq_.push_back(msg);
}
- msgq_.push_back(msg);
- ss_->WakeUp();
+ WakeUpSocketServer();
}
void MessageQueue::PostDelayed(int cmsDelay,
@@ -345,18 +368,20 @@
// Add to the priority queue. Gets sorted soonest first.
// Signal for the multiplexer to return.
- CritScope cs(&crit_);
- Message msg;
- msg.phandler = phandler;
- msg.message_id = id;
- msg.pdata = pdata;
- DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
- dmsgq_.push(dmsg);
- // If this message queue processes 1 message every millisecond for 50 days,
- // we will wrap this number. Even then, only messages with identical times
- // will be misordered, and then only briefly. This is probably ok.
- VERIFY(0 != ++dmsgq_next_num_);
- ss_->WakeUp();
+ {
+ CritScope cs(&crit_);
+ Message msg;
+ msg.phandler = phandler;
+ msg.message_id = id;
+ msg.pdata = pdata;
+ DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
+ dmsgq_.push(dmsg);
+ // If this message queue processes 1 message every millisecond for 50 days,
+ // we will wrap this number. Even then, only messages with identical times
+ // will be misordered, and then only briefly. This is probably ok.
+ VERIFY(0 != ++dmsgq_next_num_);
+ }
+ WakeUpSocketServer();
}
int MessageQueue::GetDelay() {