blob: ebf98f58a34e3539a85bb4068bddadf8eab9c678 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
deadbeeff5f03e82016-06-06 11:16:06 -070012#include "webrtc/base/atomicops.h"
danilchapbebf54c2016-04-28 01:32:48 -070013#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000014#include "webrtc/base/common.h"
15#include "webrtc/base/logging.h"
16#include "webrtc/base/messagequeue.h"
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070017#include "webrtc/base/stringencode.h"
deadbeeff5f03e82016-06-06 11:16:06 -070018#include "webrtc/base/thread.h"
pbos79e28422016-04-29 08:48:05 -070019#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000020
21namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070022namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023
Honghai Zhang82d78622016-05-06 11:29:15 -070024const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070025const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000026
andrespcdf61722016-07-08 02:45:40 -070027class SCOPED_LOCKABLE DebugNonReentrantCritScope {
28 public:
29 DebugNonReentrantCritScope(const CriticalSection* cs, bool* locked)
30 EXCLUSIVE_LOCK_FUNCTION(cs)
31 : cs_(cs), locked_(locked) {
32 cs_->Enter();
33 ASSERT(!*locked_);
34 *locked_ = true;
35 }
36
37 ~DebugNonReentrantCritScope() UNLOCK_FUNCTION() {
38 *locked_ = false;
39 cs_->Leave();
40 }
41
42 private:
43 const CriticalSection* const cs_;
44 bool* locked_;
45
46 RTC_DISALLOW_COPY_AND_ASSIGN(DebugNonReentrantCritScope);
47};
48} // namespace
49
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000050//------------------------------------------------------------------
51// MessageQueueManager
52
53MessageQueueManager* MessageQueueManager::instance_ = NULL;
54
55MessageQueueManager* MessageQueueManager::Instance() {
56 // Note: This is not thread safe, but it is first called before threads are
57 // spawned.
58 if (!instance_)
59 instance_ = new MessageQueueManager;
60 return instance_;
61}
62
63bool MessageQueueManager::IsInitialized() {
64 return instance_ != NULL;
65}
66
andrespcdf61722016-07-08 02:45:40 -070067MessageQueueManager::MessageQueueManager() : locked_(false) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000068
69MessageQueueManager::~MessageQueueManager() {
70}
71
72void MessageQueueManager::Add(MessageQueue *message_queue) {
73 return Instance()->AddInternal(message_queue);
74}
75void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
andrespcdf61722016-07-08 02:45:40 -070076 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077 message_queues_.push_back(message_queue);
78}
79
80void MessageQueueManager::Remove(MessageQueue *message_queue) {
81 // If there isn't a message queue manager instance, then there isn't a queue
82 // to remove.
83 if (!instance_) return;
84 return Instance()->RemoveInternal(message_queue);
85}
86void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000087 // If this is the last MessageQueue, destroy the manager as well so that
88 // we don't leak this object at program shutdown. As mentioned above, this is
89 // not thread-safe, but this should only happen at program termination (when
90 // the ThreadManager is destroyed, and threads are no longer active).
91 bool destroy = false;
92 {
andrespcdf61722016-07-08 02:45:40 -070093 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000094 std::vector<MessageQueue *>::iterator iter;
95 iter = std::find(message_queues_.begin(), message_queues_.end(),
96 message_queue);
97 if (iter != message_queues_.end()) {
98 message_queues_.erase(iter);
99 }
100 destroy = message_queues_.empty();
101 }
102 if (destroy) {
103 instance_ = NULL;
104 delete this;
105 }
106}
107
108void MessageQueueManager::Clear(MessageHandler *handler) {
109 // If there isn't a message queue manager instance, then there aren't any
110 // queues to remove this handler from.
111 if (!instance_) return;
112 return Instance()->ClearInternal(handler);
113}
114void MessageQueueManager::ClearInternal(MessageHandler *handler) {
andrespcdf61722016-07-08 02:45:40 -0700115 DebugNonReentrantCritScope cs(&crit_, &locked_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000116 std::vector<MessageQueue *>::iterator iter;
117 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
118 (*iter)->Clear(handler);
119}
120
deadbeeff5f03e82016-06-06 11:16:06 -0700121void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700122 if (!instance_) {
123 return;
124 }
deadbeeff5f03e82016-06-06 11:16:06 -0700125 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700126}
127
deadbeeff5f03e82016-06-06 11:16:06 -0700128void MessageQueueManager::ProcessAllMessageQueuesInternal() {
deadbeeff5f03e82016-06-06 11:16:06 -0700129 // Post a delayed message at the current time and wait for it to be dispatched
130 // on all queues, which will ensure that all messages that came before it were
131 // also dispatched.
132 volatile int queues_not_done;
133 auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
134 FunctorMessageHandler<void, decltype(functor)> handler(functor);
135 {
andrespcdf61722016-07-08 02:45:40 -0700136 DebugNonReentrantCritScope cs(&crit_, &locked_);
deadbeeff5f03e82016-06-06 11:16:06 -0700137 queues_not_done = static_cast<int>(message_queues_.size());
138 for (MessageQueue* queue : message_queues_) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700139 queue->PostDelayed(RTC_FROM_HERE, 0, &handler);
deadbeeff5f03e82016-06-06 11:16:06 -0700140 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700141 }
deadbeeff5f03e82016-06-06 11:16:06 -0700142 // Note: One of the message queues may have been on this thread, which is why
143 // we can't synchronously wait for queues_not_done to go to 0; we need to
144 // process messages as well.
145 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
146 rtc::Thread::Current()->ProcessMessages(0);
147 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700148}
149
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000150//------------------------------------------------------------------
151// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800152MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200153 : fPeekKeep_(false),
154 dmsgq_next_num_(0),
155 fInitialized_(false),
156 fDestroyed_(false),
157 stop_(0),
158 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700159 RTC_DCHECK(ss);
160 // Currently, MessageQueue holds a socket server, and is the base class for
161 // Thread. It seems like it makes more sense for Thread to hold the socket
162 // server, and provide it to the MessageQueue, since the Thread controls
163 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
164 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000165 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800166 if (init_queue) {
167 DoInit();
168 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000169}
170
danilchapbebf54c2016-04-28 01:32:48 -0700171MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
172 : MessageQueue(ss.get(), init_queue) {
173 own_ss_ = std::move(ss);
174}
175
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000176MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800177 DoDestroy();
178}
179
180void MessageQueue::DoInit() {
181 if (fInitialized_) {
182 return;
183 }
184
185 fInitialized_ = true;
186 MessageQueueManager::Add(this);
187}
188
189void MessageQueue::DoDestroy() {
190 if (fDestroyed_) {
191 return;
192 }
193
194 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000195 // The signal is done from here to ensure
196 // that it always gets called when the queue
197 // is going away.
198 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000199 MessageQueueManager::Remove(this);
200 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800201
202 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000203 if (ss_) {
204 ss_->SetMessageQueue(NULL);
205 }
206}
207
jbauch9ccedc32016-02-25 01:14:56 -0800208SocketServer* MessageQueue::socketserver() {
209 SharedScope ss(&ss_lock_);
210 return ss_;
211}
212
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000213void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800214 // Need to lock exclusively here to prevent simultaneous modifications from
215 // other threads. Can't be a shared lock to prevent races with other reading
216 // threads.
217 // Other places that only read "ss_" can use a shared lock as simultaneous
218 // read access is allowed.
219 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700220 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000221 ss_->SetMessageQueue(this);
222}
223
jbauch9ccedc32016-02-25 01:14:56 -0800224void MessageQueue::WakeUpSocketServer() {
225 SharedScope ss(&ss_lock_);
226 ss_->WakeUp();
227}
228
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000229void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200230 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800231 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000232}
233
234bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200235 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000236}
237
238void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200239 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000240}
241
242bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
243 if (fPeekKeep_) {
244 *pmsg = msgPeek_;
245 return true;
246 }
247 if (!Get(pmsg, cmsWait))
248 return false;
249 msgPeek_ = *pmsg;
250 fPeekKeep_ = true;
251 return true;
252}
253
254bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
255 // Return and clear peek if present
256 // Always return the peek if it exists so there is Peek/Get symmetry
257
258 if (fPeekKeep_) {
259 *pmsg = msgPeek_;
260 fPeekKeep_ = false;
261 return true;
262 }
263
264 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
265
Honghai Zhang82d78622016-05-06 11:29:15 -0700266 int64_t cmsTotal = cmsWait;
267 int64_t cmsElapsed = 0;
268 int64_t msStart = TimeMillis();
269 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000270 while (true) {
271 // Check for sent messages
272 ReceiveSends();
273
274 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700275 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000276 bool first_pass = true;
277 while (true) {
278 // All queue operations need to be locked, but nothing else in this loop
279 // (specifically handling disposed message) can happen inside the crit.
280 // Otherwise, disposed MessageHandlers will cause deadlocks.
281 {
282 CritScope cs(&crit_);
283 // On the first pass, check for delayed messages that have been
284 // triggered and calculate the next trigger time.
285 if (first_pass) {
286 first_pass = false;
287 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700288 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000289 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
290 break;
291 }
292 msgq_.push_back(dmsgq_.top().msg_);
293 dmsgq_.pop();
294 }
295 }
296 // Pull a message off the message queue, if available.
297 if (msgq_.empty()) {
298 break;
299 } else {
300 *pmsg = msgq_.front();
301 msgq_.pop_front();
302 }
303 } // crit_ is released here.
304
305 // Log a warning for time-sensitive messages that we're late to deliver.
306 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700307 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000308 if (delay > 0) {
309 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
310 << (delay + kMaxMsgLatency) << "ms";
311 }
312 }
313 // If this was a dispose message, delete it and skip it.
314 if (MQID_DISPOSE == pmsg->message_id) {
315 ASSERT(NULL == pmsg->phandler);
316 delete pmsg->pdata;
317 *pmsg = Message();
318 continue;
319 }
320 return true;
321 }
322
André Susano Pinto02a57972016-07-22 13:30:05 +0200323 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000324 break;
325
326 // Which is shorter, the delay wait or the asked wait?
327
Honghai Zhang82d78622016-05-06 11:29:15 -0700328 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000329 if (cmsWait == kForever) {
330 cmsNext = cmsDelayNext;
331 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700332 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
334 cmsNext = cmsDelayNext;
335 }
336
jbauch9ccedc32016-02-25 01:14:56 -0800337 {
338 // Wait and multiplex in the meantime
339 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700340 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800341 return false;
342 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000343
344 // If the specified timeout expired, return
345
Honghai Zhang82d78622016-05-06 11:29:15 -0700346 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000347 cmsElapsed = TimeDiff(msCurrent, msStart);
348 if (cmsWait != kForever) {
349 if (cmsElapsed >= cmsWait)
350 return false;
351 }
352 }
353 return false;
354}
355
356void MessageQueue::ReceiveSends() {
357}
358
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700359void MessageQueue::Post(const Location& posted_from,
360 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200361 uint32_t id,
362 MessageData* pdata,
363 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200364 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000365 return;
366
367 // Keep thread safe
368 // Add the message to the end of the queue
369 // Signal for the multiplexer to return
370
jbauch9ccedc32016-02-25 01:14:56 -0800371 {
372 CritScope cs(&crit_);
373 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700374 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800375 msg.phandler = phandler;
376 msg.message_id = id;
377 msg.pdata = pdata;
378 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700379 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800380 }
381 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000382 }
jbauch9ccedc32016-02-25 01:14:56 -0800383 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000384}
385
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700386void MessageQueue::PostDelayed(const Location& posted_from,
387 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000388 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200389 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000390 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700391 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
392 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000393}
394
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700395void MessageQueue::PostAt(const Location& posted_from,
396 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000397 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200398 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000399 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700400 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700401 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700402 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700403}
404
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700405void MessageQueue::PostAt(const Location& posted_from,
406 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700407 MessageHandler* phandler,
408 uint32_t id,
409 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700410 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
411 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000412}
413
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700414void MessageQueue::DoDelayPost(const Location& posted_from,
415 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700416 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200417 MessageHandler* phandler,
418 uint32_t id,
419 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200420 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000421 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700422 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000423
424 // Keep thread safe
425 // Add to the priority queue. Gets sorted soonest first.
426 // Signal for the multiplexer to return.
427
jbauch9ccedc32016-02-25 01:14:56 -0800428 {
429 CritScope cs(&crit_);
430 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700431 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800432 msg.phandler = phandler;
433 msg.message_id = id;
434 msg.pdata = pdata;
435 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
436 dmsgq_.push(dmsg);
437 // If this message queue processes 1 message every millisecond for 50 days,
438 // we will wrap this number. Even then, only messages with identical times
439 // will be misordered, and then only briefly. This is probably ok.
440 VERIFY(0 != ++dmsgq_next_num_);
441 }
442 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000443}
444
445int MessageQueue::GetDelay() {
446 CritScope cs(&crit_);
447
448 if (!msgq_.empty())
449 return 0;
450
451 if (!dmsgq_.empty()) {
452 int delay = TimeUntil(dmsgq_.top().msTrigger_);
453 if (delay < 0)
454 delay = 0;
455 return delay;
456 }
457
458 return kForever;
459}
460
Peter Boström0c4e06b2015-10-07 12:23:21 +0200461void MessageQueue::Clear(MessageHandler* phandler,
462 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000463 MessageList* removed) {
464 CritScope cs(&crit_);
465
466 // Remove messages with phandler
467
468 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
469 if (removed) {
470 removed->push_back(msgPeek_);
471 } else {
472 delete msgPeek_.pdata;
473 }
474 fPeekKeep_ = false;
475 }
476
477 // Remove from ordered message queue
478
479 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
480 if (it->Match(phandler, id)) {
481 if (removed) {
482 removed->push_back(*it);
483 } else {
484 delete it->pdata;
485 }
486 it = msgq_.erase(it);
487 } else {
488 ++it;
489 }
490 }
491
492 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000493
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000494 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
495 for (PriorityQueue::container_type::iterator it = new_end;
496 it != dmsgq_.container().end(); ++it) {
497 if (it->msg_.Match(phandler, id)) {
498 if (removed) {
499 removed->push_back(it->msg_);
500 } else {
501 delete it->msg_.pdata;
502 }
503 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000504 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000505 }
506 }
507 dmsgq_.container().erase(new_end, dmsgq_.container().end());
508 dmsgq_.reheap();
509}
510
511void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700512 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
513 pmsg->posted_from.file_and_line(), "src_func",
514 pmsg->posted_from.function_name());
515 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000516 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700517 int64_t end_time = TimeMillis();
518 int64_t diff = TimeDiff(end_time, start_time);
519 if (diff >= kSlowDispatchLoggingThreshold) {
520 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
521 << pmsg->posted_from.ToString();
522 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000523}
524
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000525} // namespace rtc