blob: efe618851a82086c5450dd3096684ee04477ee58 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
deadbeeff5f03e82016-06-06 11:16:06 -070012#include "webrtc/base/atomicops.h"
danilchapbebf54c2016-04-28 01:32:48 -070013#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000014#include "webrtc/base/logging.h"
15#include "webrtc/base/messagequeue.h"
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070016#include "webrtc/base/stringencode.h"
deadbeeff5f03e82016-06-06 11:16:06 -070017#include "webrtc/base/thread.h"
pbos79e28422016-04-29 08:48:05 -070018#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000022
Honghai Zhang82d78622016-05-06 11:29:15 -070023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
andrespcdf61722016-07-08 02:45:40 -070026class SCOPED_LOCKABLE DebugNonReentrantCritScope {
27 public:
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked)
29 EXCLUSIVE_LOCK_FUNCTION(cs)
30 : cs_(cs), locked_(locked) {
31 cs_->Enter();
nisseede5da42017-01-12 05:15:36 -080032 RTC_DCHECK(!*locked_);
andrespcdf61722016-07-08 02:45:40 -070033 *locked_ = true;
34 }
35
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() {
37 *locked_ = false;
38 cs_->Leave();
39 }
40
41 private:
42 const CriticalSection* const cs_;
43 bool* locked_;
44
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope);
46};
47} // namespace
48
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000049//------------------------------------------------------------------
50// MessageQueueManager
51
deadbeef37f5ecf2017-02-27 14:06:41 -080052MessageQueueManager* MessageQueueManager::instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000053
54MessageQueueManager* MessageQueueManager::Instance() {
55 // Note: This is not thread safe, but it is first called before threads are
56 // spawned.
57 if (!instance_)
58 instance_ = new MessageQueueManager;
59 return instance_;
60}
61
62bool MessageQueueManager::IsInitialized() {
deadbeef37f5ecf2017-02-27 14:06:41 -080063 return instance_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000064}
65
andrespcdf61722016-07-08 02:45:40 -070066MessageQueueManager::MessageQueueManager() : locked_(false) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000067
68MessageQueueManager::~MessageQueueManager() {
69}
70
71void MessageQueueManager::Add(MessageQueue *message_queue) {
72 return Instance()->AddInternal(message_queue);
73}
74void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
andrespcdf61722016-07-08 02:45:40 -070075 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076 message_queues_.push_back(message_queue);
77}
78
79void MessageQueueManager::Remove(MessageQueue *message_queue) {
80 // If there isn't a message queue manager instance, then there isn't a queue
81 // to remove.
82 if (!instance_) return;
83 return Instance()->RemoveInternal(message_queue);
84}
85void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000086 // If this is the last MessageQueue, destroy the manager as well so that
87 // we don't leak this object at program shutdown. As mentioned above, this is
88 // not thread-safe, but this should only happen at program termination (when
89 // the ThreadManager is destroyed, and threads are no longer active).
90 bool destroy = false;
91 {
andrespcdf61722016-07-08 02:45:40 -070092 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000093 std::vector<MessageQueue *>::iterator iter;
94 iter = std::find(message_queues_.begin(), message_queues_.end(),
95 message_queue);
96 if (iter != message_queues_.end()) {
97 message_queues_.erase(iter);
98 }
99 destroy = message_queues_.empty();
100 }
101 if (destroy) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800102 instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000103 delete this;
104 }
105}
106
107void MessageQueueManager::Clear(MessageHandler *handler) {
108 // If there isn't a message queue manager instance, then there aren't any
109 // queues to remove this handler from.
110 if (!instance_) return;
111 return Instance()->ClearInternal(handler);
112}
113void MessageQueueManager::ClearInternal(MessageHandler *handler) {
andrespcdf61722016-07-08 02:45:40 -0700114 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000115 std::vector<MessageQueue *>::iterator iter;
116 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
117 (*iter)->Clear(handler);
118}
119
deadbeeff5f03e82016-06-06 11:16:06 -0700120void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700121 if (!instance_) {
122 return;
123 }
deadbeeff5f03e82016-06-06 11:16:06 -0700124 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700125}
126
deadbeeff5f03e82016-06-06 11:16:06 -0700127void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700128 // This works by posting a delayed message at the current time and waiting
129 // for it to be dispatched on all queues, which will ensure that all messages
130 // that came before it were also dispatched.
131 volatile int queues_not_done = 0;
132
133 // This class is used so that whether the posted message is processed, or the
134 // message queue is simply cleared, queues_not_done gets decremented.
135 class ScopedIncrement : public MessageData {
136 public:
137 ScopedIncrement(volatile int* value) : value_(value) {
138 AtomicOps::Increment(value_);
139 }
140 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
141
142 private:
143 volatile int* value_;
144 };
145
deadbeeff5f03e82016-06-06 11:16:06 -0700146 {
andrespcdf61722016-07-08 02:45:40 -0700147 DebugNonReentrantCritScope cs(&crit_, &locked_);
deadbeeff5f03e82016-06-06 11:16:06 -0700148 for (MessageQueue* queue : message_queues_) {
pthatcher1749bc32017-02-08 13:18:00 -0800149 if (!queue->IsProcessingMessages()) {
150 // If the queue is not processing messages, it can
151 // be ignored. If we tried to post a message to it, it would be dropped
152 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700153 continue;
154 }
155 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
156 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700157 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700158 }
deadbeeff5f03e82016-06-06 11:16:06 -0700159 // Note: One of the message queues may have been on this thread, which is why
160 // we can't synchronously wait for queues_not_done to go to 0; we need to
161 // process messages as well.
162 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
163 rtc::Thread::Current()->ProcessMessages(0);
164 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700165}
166
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000167//------------------------------------------------------------------
168// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800169MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200170 : fPeekKeep_(false),
171 dmsgq_next_num_(0),
172 fInitialized_(false),
173 fDestroyed_(false),
174 stop_(0),
175 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700176 RTC_DCHECK(ss);
177 // Currently, MessageQueue holds a socket server, and is the base class for
178 // Thread. It seems like it makes more sense for Thread to hold the socket
179 // server, and provide it to the MessageQueue, since the Thread controls
180 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
181 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000182 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800183 if (init_queue) {
184 DoInit();
185 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000186}
187
danilchapbebf54c2016-04-28 01:32:48 -0700188MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
189 : MessageQueue(ss.get(), init_queue) {
190 own_ss_ = std::move(ss);
191}
192
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000193MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800194 DoDestroy();
195}
196
197void MessageQueue::DoInit() {
198 if (fInitialized_) {
199 return;
200 }
201
202 fInitialized_ = true;
203 MessageQueueManager::Add(this);
204}
205
206void MessageQueue::DoDestroy() {
207 if (fDestroyed_) {
208 return;
209 }
210
211 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000212 // The signal is done from here to ensure
213 // that it always gets called when the queue
214 // is going away.
215 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000216 MessageQueueManager::Remove(this);
deadbeef37f5ecf2017-02-27 14:06:41 -0800217 Clear(nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800218
219 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000220 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800221 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222 }
223}
224
jbauch9ccedc32016-02-25 01:14:56 -0800225SocketServer* MessageQueue::socketserver() {
226 SharedScope ss(&ss_lock_);
227 return ss_;
228}
229
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000230void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800231 // Need to lock exclusively here to prevent simultaneous modifications from
232 // other threads. Can't be a shared lock to prevent races with other reading
233 // threads.
234 // Other places that only read "ss_" can use a shared lock as simultaneous
235 // read access is allowed.
236 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700237 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000238 ss_->SetMessageQueue(this);
239}
240
jbauch9ccedc32016-02-25 01:14:56 -0800241void MessageQueue::WakeUpSocketServer() {
242 SharedScope ss(&ss_lock_);
243 ss_->WakeUp();
244}
245
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000246void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200247 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800248 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000249}
250
251bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200252 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000253}
254
pthatcher1749bc32017-02-08 13:18:00 -0800255bool MessageQueue::IsProcessingMessages() {
256 return !IsQuitting();
257}
258
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000259void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200260 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000261}
262
263bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
264 if (fPeekKeep_) {
265 *pmsg = msgPeek_;
266 return true;
267 }
268 if (!Get(pmsg, cmsWait))
269 return false;
270 msgPeek_ = *pmsg;
271 fPeekKeep_ = true;
272 return true;
273}
274
275bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
276 // Return and clear peek if present
277 // Always return the peek if it exists so there is Peek/Get symmetry
278
279 if (fPeekKeep_) {
280 *pmsg = msgPeek_;
281 fPeekKeep_ = false;
282 return true;
283 }
284
285 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
286
Honghai Zhang82d78622016-05-06 11:29:15 -0700287 int64_t cmsTotal = cmsWait;
288 int64_t cmsElapsed = 0;
289 int64_t msStart = TimeMillis();
290 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000291 while (true) {
292 // Check for sent messages
293 ReceiveSends();
294
295 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700296 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000297 bool first_pass = true;
298 while (true) {
299 // All queue operations need to be locked, but nothing else in this loop
300 // (specifically handling disposed message) can happen inside the crit.
301 // Otherwise, disposed MessageHandlers will cause deadlocks.
302 {
303 CritScope cs(&crit_);
304 // On the first pass, check for delayed messages that have been
305 // triggered and calculate the next trigger time.
306 if (first_pass) {
307 first_pass = false;
308 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700309 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000310 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
311 break;
312 }
313 msgq_.push_back(dmsgq_.top().msg_);
314 dmsgq_.pop();
315 }
316 }
317 // Pull a message off the message queue, if available.
318 if (msgq_.empty()) {
319 break;
320 } else {
321 *pmsg = msgq_.front();
322 msgq_.pop_front();
323 }
324 } // crit_ is released here.
325
326 // Log a warning for time-sensitive messages that we're late to deliver.
327 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700328 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000329 if (delay > 0) {
330 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
331 << (delay + kMaxMsgLatency) << "ms";
332 }
333 }
334 // If this was a dispose message, delete it and skip it.
335 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800336 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000337 delete pmsg->pdata;
338 *pmsg = Message();
339 continue;
340 }
341 return true;
342 }
343
André Susano Pinto02a57972016-07-22 13:30:05 +0200344 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000345 break;
346
347 // Which is shorter, the delay wait or the asked wait?
348
Honghai Zhang82d78622016-05-06 11:29:15 -0700349 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000350 if (cmsWait == kForever) {
351 cmsNext = cmsDelayNext;
352 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700353 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000354 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
355 cmsNext = cmsDelayNext;
356 }
357
jbauch9ccedc32016-02-25 01:14:56 -0800358 {
359 // Wait and multiplex in the meantime
360 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700361 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800362 return false;
363 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000364
365 // If the specified timeout expired, return
366
Honghai Zhang82d78622016-05-06 11:29:15 -0700367 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000368 cmsElapsed = TimeDiff(msCurrent, msStart);
369 if (cmsWait != kForever) {
370 if (cmsElapsed >= cmsWait)
371 return false;
372 }
373 }
374 return false;
375}
376
377void MessageQueue::ReceiveSends() {
378}
379
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700380void MessageQueue::Post(const Location& posted_from,
381 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200382 uint32_t id,
383 MessageData* pdata,
384 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200385 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000386 return;
387
388 // Keep thread safe
389 // Add the message to the end of the queue
390 // Signal for the multiplexer to return
391
jbauch9ccedc32016-02-25 01:14:56 -0800392 {
393 CritScope cs(&crit_);
394 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700395 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800396 msg.phandler = phandler;
397 msg.message_id = id;
398 msg.pdata = pdata;
399 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700400 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800401 }
402 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000403 }
jbauch9ccedc32016-02-25 01:14:56 -0800404 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000405}
406
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700407void MessageQueue::PostDelayed(const Location& posted_from,
408 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000409 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200410 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000411 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700412 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
413 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000414}
415
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700416void MessageQueue::PostAt(const Location& posted_from,
417 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000418 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200419 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000420 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700421 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700422 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700423 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700424}
425
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700426void MessageQueue::PostAt(const Location& posted_from,
427 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700428 MessageHandler* phandler,
429 uint32_t id,
430 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700431 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
432 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000433}
434
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700435void MessageQueue::DoDelayPost(const Location& posted_from,
436 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700437 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200438 MessageHandler* phandler,
439 uint32_t id,
440 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200441 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000442 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700443 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000444
445 // Keep thread safe
446 // Add to the priority queue. Gets sorted soonest first.
447 // Signal for the multiplexer to return.
448
jbauch9ccedc32016-02-25 01:14:56 -0800449 {
450 CritScope cs(&crit_);
451 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700452 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800453 msg.phandler = phandler;
454 msg.message_id = id;
455 msg.pdata = pdata;
456 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
457 dmsgq_.push(dmsg);
458 // If this message queue processes 1 message every millisecond for 50 days,
459 // we will wrap this number. Even then, only messages with identical times
460 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800461 ++dmsgq_next_num_;
462 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800463 }
464 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000465}
466
467int MessageQueue::GetDelay() {
468 CritScope cs(&crit_);
469
470 if (!msgq_.empty())
471 return 0;
472
473 if (!dmsgq_.empty()) {
474 int delay = TimeUntil(dmsgq_.top().msTrigger_);
475 if (delay < 0)
476 delay = 0;
477 return delay;
478 }
479
480 return kForever;
481}
482
Peter Boström0c4e06b2015-10-07 12:23:21 +0200483void MessageQueue::Clear(MessageHandler* phandler,
484 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000485 MessageList* removed) {
486 CritScope cs(&crit_);
487
488 // Remove messages with phandler
489
490 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
491 if (removed) {
492 removed->push_back(msgPeek_);
493 } else {
494 delete msgPeek_.pdata;
495 }
496 fPeekKeep_ = false;
497 }
498
499 // Remove from ordered message queue
500
501 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
502 if (it->Match(phandler, id)) {
503 if (removed) {
504 removed->push_back(*it);
505 } else {
506 delete it->pdata;
507 }
508 it = msgq_.erase(it);
509 } else {
510 ++it;
511 }
512 }
513
514 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000515
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000516 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
517 for (PriorityQueue::container_type::iterator it = new_end;
518 it != dmsgq_.container().end(); ++it) {
519 if (it->msg_.Match(phandler, id)) {
520 if (removed) {
521 removed->push_back(it->msg_);
522 } else {
523 delete it->msg_.pdata;
524 }
525 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000526 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000527 }
528 }
529 dmsgq_.container().erase(new_end, dmsgq_.container().end());
530 dmsgq_.reheap();
531}
532
533void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700534 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
535 pmsg->posted_from.file_and_line(), "src_func",
536 pmsg->posted_from.function_name());
537 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000538 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700539 int64_t end_time = TimeMillis();
540 int64_t diff = TimeDiff(end_time, start_time);
541 if (diff >= kSlowDispatchLoggingThreshold) {
542 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
543 << pmsg->posted_from.ToString();
544 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000545}
546
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000547} // namespace rtc