blob: fce1906a41e0d5c5db6c07e0c5c4bd07ccf30cf8 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
Henrik Kjellander00725112017-06-30 15:14:45 +020012#include "webrtc/base/atomicops.h"
13#include "webrtc/base/checks.h"
14#include "webrtc/base/logging.h"
15#include "webrtc/base/messagequeue.h"
16#include "webrtc/base/stringencode.h"
17#include "webrtc/base/thread.h"
18#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000022
Honghai Zhang82d78622016-05-06 11:29:15 -070023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
andrespcdf61722016-07-08 02:45:40 -070026class SCOPED_LOCKABLE DebugNonReentrantCritScope {
27 public:
28 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked)
29 EXCLUSIVE_LOCK_FUNCTION(cs)
30 : cs_(cs), locked_(locked) {
31 cs_->Enter();
nisseede5da42017-01-12 05:15:36 -080032 RTC_DCHECK(!*locked_);
andrespcdf61722016-07-08 02:45:40 -070033 *locked_ = true;
34 }
35
36 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() {
37 *locked_ = false;
38 cs_->Leave();
39 }
40
41 private:
42 const CriticalSection* const cs_;
43 bool* locked_;
44
45 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope);
46};
nissebc8feda2017-06-29 06:21:20 -070047
48class FunctorPostMessageHandler : public MessageHandler {
49 public:
50 void OnMessage(Message* msg) override {
51 RunnableData* data = static_cast<RunnableData*>(msg->pdata);
52 data->Run();
53 delete data;
54 }
55};
56
andrespcdf61722016-07-08 02:45:40 -070057} // namespace
58
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000059//------------------------------------------------------------------
60// MessageQueueManager
61
deadbeef37f5ecf2017-02-27 14:06:41 -080062MessageQueueManager* MessageQueueManager::instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063
64MessageQueueManager* MessageQueueManager::Instance() {
65 // Note: This is not thread safe, but it is first called before threads are
66 // spawned.
67 if (!instance_)
68 instance_ = new MessageQueueManager;
69 return instance_;
70}
71
72bool MessageQueueManager::IsInitialized() {
deadbeef37f5ecf2017-02-27 14:06:41 -080073 return instance_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000074}
75
andrespcdf61722016-07-08 02:45:40 -070076MessageQueueManager::MessageQueueManager() : locked_(false) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077
78MessageQueueManager::~MessageQueueManager() {
79}
80
81void MessageQueueManager::Add(MessageQueue *message_queue) {
82 return Instance()->AddInternal(message_queue);
83}
84void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
andrespcdf61722016-07-08 02:45:40 -070085 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000086 message_queues_.push_back(message_queue);
87}
88
89void MessageQueueManager::Remove(MessageQueue *message_queue) {
90 // If there isn't a message queue manager instance, then there isn't a queue
91 // to remove.
92 if (!instance_) return;
93 return Instance()->RemoveInternal(message_queue);
94}
95void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000096 // If this is the last MessageQueue, destroy the manager as well so that
97 // we don't leak this object at program shutdown. As mentioned above, this is
98 // not thread-safe, but this should only happen at program termination (when
99 // the ThreadManager is destroyed, and threads are no longer active).
100 bool destroy = false;
101 {
andrespcdf61722016-07-08 02:45:40 -0700102 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000103 std::vector<MessageQueue *>::iterator iter;
104 iter = std::find(message_queues_.begin(), message_queues_.end(),
105 message_queue);
106 if (iter != message_queues_.end()) {
107 message_queues_.erase(iter);
108 }
109 destroy = message_queues_.empty();
110 }
111 if (destroy) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800112 instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000113 delete this;
114 }
115}
116
117void MessageQueueManager::Clear(MessageHandler *handler) {
118 // If there isn't a message queue manager instance, then there aren't any
119 // queues to remove this handler from.
120 if (!instance_) return;
121 return Instance()->ClearInternal(handler);
122}
123void MessageQueueManager::ClearInternal(MessageHandler *handler) {
andrespcdf61722016-07-08 02:45:40 -0700124 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000125 std::vector<MessageQueue *>::iterator iter;
126 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
127 (*iter)->Clear(handler);
128}
129
deadbeeff5f03e82016-06-06 11:16:06 -0700130void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700131 if (!instance_) {
132 return;
133 }
deadbeeff5f03e82016-06-06 11:16:06 -0700134 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700135}
136
deadbeeff5f03e82016-06-06 11:16:06 -0700137void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700138 // This works by posting a delayed message at the current time and waiting
139 // for it to be dispatched on all queues, which will ensure that all messages
140 // that came before it were also dispatched.
141 volatile int queues_not_done = 0;
142
143 // This class is used so that whether the posted message is processed, or the
144 // message queue is simply cleared, queues_not_done gets decremented.
145 class ScopedIncrement : public MessageData {
146 public:
147 ScopedIncrement(volatile int* value) : value_(value) {
148 AtomicOps::Increment(value_);
149 }
150 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
151
152 private:
153 volatile int* value_;
154 };
155
deadbeeff5f03e82016-06-06 11:16:06 -0700156 {
andrespcdf61722016-07-08 02:45:40 -0700157 DebugNonReentrantCritScope cs(&crit_, &locked_);
deadbeeff5f03e82016-06-06 11:16:06 -0700158 for (MessageQueue* queue : message_queues_) {
pthatcher1749bc32017-02-08 13:18:00 -0800159 if (!queue->IsProcessingMessages()) {
160 // If the queue is not processing messages, it can
161 // be ignored. If we tried to post a message to it, it would be dropped
162 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700163 continue;
164 }
165 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
166 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700167 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700168 }
deadbeeff5f03e82016-06-06 11:16:06 -0700169 // Note: One of the message queues may have been on this thread, which is why
170 // we can't synchronously wait for queues_not_done to go to 0; we need to
171 // process messages as well.
172 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
173 rtc::Thread::Current()->ProcessMessages(0);
174 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700175}
176
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000177//------------------------------------------------------------------
178// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800179MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200180 : fPeekKeep_(false),
181 dmsgq_next_num_(0),
182 fInitialized_(false),
183 fDestroyed_(false),
184 stop_(0),
185 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700186 RTC_DCHECK(ss);
187 // Currently, MessageQueue holds a socket server, and is the base class for
188 // Thread. It seems like it makes more sense for Thread to hold the socket
189 // server, and provide it to the MessageQueue, since the Thread controls
190 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
191 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000192 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800193 if (init_queue) {
194 DoInit();
195 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000196}
197
danilchapbebf54c2016-04-28 01:32:48 -0700198MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
199 : MessageQueue(ss.get(), init_queue) {
200 own_ss_ = std::move(ss);
201}
202
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000203MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800204 DoDestroy();
205}
206
207void MessageQueue::DoInit() {
208 if (fInitialized_) {
209 return;
210 }
211
212 fInitialized_ = true;
213 MessageQueueManager::Add(this);
214}
215
216void MessageQueue::DoDestroy() {
217 if (fDestroyed_) {
218 return;
219 }
220
221 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222 // The signal is done from here to ensure
223 // that it always gets called when the queue
224 // is going away.
225 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000226 MessageQueueManager::Remove(this);
deadbeef37f5ecf2017-02-27 14:06:41 -0800227 Clear(nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800228
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000229 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800230 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000231 }
232}
233
jbauch9ccedc32016-02-25 01:14:56 -0800234SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800235 return ss_;
236}
237
jbauch9ccedc32016-02-25 01:14:56 -0800238void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800239 ss_->WakeUp();
240}
241
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000242void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200243 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800244 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000245}
246
247bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200248 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000249}
250
pthatcher1749bc32017-02-08 13:18:00 -0800251bool MessageQueue::IsProcessingMessages() {
252 return !IsQuitting();
253}
254
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000255void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200256 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000257}
258
259bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
260 if (fPeekKeep_) {
261 *pmsg = msgPeek_;
262 return true;
263 }
264 if (!Get(pmsg, cmsWait))
265 return false;
266 msgPeek_ = *pmsg;
267 fPeekKeep_ = true;
268 return true;
269}
270
271bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
272 // Return and clear peek if present
273 // Always return the peek if it exists so there is Peek/Get symmetry
274
275 if (fPeekKeep_) {
276 *pmsg = msgPeek_;
277 fPeekKeep_ = false;
278 return true;
279 }
280
281 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
282
Honghai Zhang82d78622016-05-06 11:29:15 -0700283 int64_t cmsTotal = cmsWait;
284 int64_t cmsElapsed = 0;
285 int64_t msStart = TimeMillis();
286 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000287 while (true) {
288 // Check for sent messages
289 ReceiveSends();
290
291 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700292 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000293 bool first_pass = true;
294 while (true) {
295 // All queue operations need to be locked, but nothing else in this loop
296 // (specifically handling disposed message) can happen inside the crit.
297 // Otherwise, disposed MessageHandlers will cause deadlocks.
298 {
299 CritScope cs(&crit_);
300 // On the first pass, check for delayed messages that have been
301 // triggered and calculate the next trigger time.
302 if (first_pass) {
303 first_pass = false;
304 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700305 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000306 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
307 break;
308 }
309 msgq_.push_back(dmsgq_.top().msg_);
310 dmsgq_.pop();
311 }
312 }
313 // Pull a message off the message queue, if available.
314 if (msgq_.empty()) {
315 break;
316 } else {
317 *pmsg = msgq_.front();
318 msgq_.pop_front();
319 }
320 } // crit_ is released here.
321
322 // Log a warning for time-sensitive messages that we're late to deliver.
323 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700324 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000325 if (delay > 0) {
326 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
327 << (delay + kMaxMsgLatency) << "ms";
328 }
329 }
330 // If this was a dispose message, delete it and skip it.
331 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800332 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333 delete pmsg->pdata;
334 *pmsg = Message();
335 continue;
336 }
337 return true;
338 }
339
André Susano Pinto02a57972016-07-22 13:30:05 +0200340 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000341 break;
342
343 // Which is shorter, the delay wait or the asked wait?
344
Honghai Zhang82d78622016-05-06 11:29:15 -0700345 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000346 if (cmsWait == kForever) {
347 cmsNext = cmsDelayNext;
348 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700349 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000350 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
351 cmsNext = cmsDelayNext;
352 }
353
jbauch9ccedc32016-02-25 01:14:56 -0800354 {
355 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700356 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800357 return false;
358 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000359
360 // If the specified timeout expired, return
361
Honghai Zhang82d78622016-05-06 11:29:15 -0700362 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000363 cmsElapsed = TimeDiff(msCurrent, msStart);
364 if (cmsWait != kForever) {
365 if (cmsElapsed >= cmsWait)
366 return false;
367 }
368 }
369 return false;
370}
371
372void MessageQueue::ReceiveSends() {
373}
374
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700375void MessageQueue::Post(const Location& posted_from,
376 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200377 uint32_t id,
378 MessageData* pdata,
379 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200380 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000381 return;
382
383 // Keep thread safe
384 // Add the message to the end of the queue
385 // Signal for the multiplexer to return
386
jbauch9ccedc32016-02-25 01:14:56 -0800387 {
388 CritScope cs(&crit_);
389 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700390 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800391 msg.phandler = phandler;
392 msg.message_id = id;
393 msg.pdata = pdata;
394 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700395 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800396 }
397 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000398 }
jbauch9ccedc32016-02-25 01:14:56 -0800399 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000400}
401
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700402void MessageQueue::PostDelayed(const Location& posted_from,
403 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000404 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200405 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000406 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700407 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
408 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000409}
410
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700411void MessageQueue::PostAt(const Location& posted_from,
412 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000413 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200414 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000415 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700416 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700417 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700418 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700419}
420
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700421void MessageQueue::PostAt(const Location& posted_from,
422 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700423 MessageHandler* phandler,
424 uint32_t id,
425 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700426 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
427 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000428}
429
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700430void MessageQueue::DoDelayPost(const Location& posted_from,
431 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700432 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200433 MessageHandler* phandler,
434 uint32_t id,
435 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200436 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000437 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700438 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000439
440 // Keep thread safe
441 // Add to the priority queue. Gets sorted soonest first.
442 // Signal for the multiplexer to return.
443
jbauch9ccedc32016-02-25 01:14:56 -0800444 {
445 CritScope cs(&crit_);
446 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700447 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800448 msg.phandler = phandler;
449 msg.message_id = id;
450 msg.pdata = pdata;
451 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
452 dmsgq_.push(dmsg);
453 // If this message queue processes 1 message every millisecond for 50 days,
454 // we will wrap this number. Even then, only messages with identical times
455 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800456 ++dmsgq_next_num_;
457 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800458 }
459 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000460}
461
462int MessageQueue::GetDelay() {
463 CritScope cs(&crit_);
464
465 if (!msgq_.empty())
466 return 0;
467
468 if (!dmsgq_.empty()) {
469 int delay = TimeUntil(dmsgq_.top().msTrigger_);
470 if (delay < 0)
471 delay = 0;
472 return delay;
473 }
474
475 return kForever;
476}
477
Peter Boström0c4e06b2015-10-07 12:23:21 +0200478void MessageQueue::Clear(MessageHandler* phandler,
479 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000480 MessageList* removed) {
481 CritScope cs(&crit_);
482
483 // Remove messages with phandler
484
485 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
486 if (removed) {
487 removed->push_back(msgPeek_);
488 } else {
489 delete msgPeek_.pdata;
490 }
491 fPeekKeep_ = false;
492 }
493
494 // Remove from ordered message queue
495
496 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
497 if (it->Match(phandler, id)) {
498 if (removed) {
499 removed->push_back(*it);
500 } else {
501 delete it->pdata;
502 }
503 it = msgq_.erase(it);
504 } else {
505 ++it;
506 }
507 }
508
509 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000510
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000511 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
512 for (PriorityQueue::container_type::iterator it = new_end;
513 it != dmsgq_.container().end(); ++it) {
514 if (it->msg_.Match(phandler, id)) {
515 if (removed) {
516 removed->push_back(it->msg_);
517 } else {
518 delete it->msg_.pdata;
519 }
520 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000521 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000522 }
523 }
524 dmsgq_.container().erase(new_end, dmsgq_.container().end());
525 dmsgq_.reheap();
526}
527
528void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700529 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
530 pmsg->posted_from.file_and_line(), "src_func",
531 pmsg->posted_from.function_name());
532 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000533 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700534 int64_t end_time = TimeMillis();
535 int64_t diff = TimeDiff(end_time, start_time);
536 if (diff >= kSlowDispatchLoggingThreshold) {
537 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
538 << pmsg->posted_from.ToString();
539 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000540}
541
nissebc8feda2017-06-29 06:21:20 -0700542void MessageQueue::PostFunctorInternal(const Location& posted_from,
543 RunnableData* message_data) {
544 // Use static to ensure it outlives this scope. Safe since
545 // FunctorPostMessageHandler keeps no state.
546 static FunctorPostMessageHandler handler;
547 Post(posted_from, &handler, 0, message_data);
548}
549
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000550} // namespace rtc