blob: c407870e6a2a01f0cc88a559989ebc2aecc53b1e [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
deadbeeff5f03e82016-06-06 11:16:06 -070012#include "webrtc/base/atomicops.h"
danilchapbebf54c2016-04-28 01:32:48 -070013#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000014#include "webrtc/base/common.h"
15#include "webrtc/base/logging.h"
16#include "webrtc/base/messagequeue.h"
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070017#include "webrtc/base/stringencode.h"
deadbeeff5f03e82016-06-06 11:16:06 -070018#include "webrtc/base/thread.h"
pbos79e28422016-04-29 08:48:05 -070019#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000020
21namespace rtc {
22
Honghai Zhang82d78622016-05-06 11:29:15 -070023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
26//------------------------------------------------------------------
27// MessageQueueManager
28
29MessageQueueManager* MessageQueueManager::instance_ = NULL;
30
31MessageQueueManager* MessageQueueManager::Instance() {
32 // Note: This is not thread safe, but it is first called before threads are
33 // spawned.
34 if (!instance_)
35 instance_ = new MessageQueueManager;
36 return instance_;
37}
38
39bool MessageQueueManager::IsInitialized() {
40 return instance_ != NULL;
41}
42
deadbeeff5f03e82016-06-06 11:16:06 -070043MessageQueueManager::MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000044
45MessageQueueManager::~MessageQueueManager() {
46}
47
48void MessageQueueManager::Add(MessageQueue *message_queue) {
49 return Instance()->AddInternal(message_queue);
50}
51void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
52 // MessageQueueManager methods should be non-reentrant, so we
53 // ASSERT that is the case. If any of these ASSERT, please
54 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020055#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000056 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000057#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000058 CritScope cs(&crit_);
59 message_queues_.push_back(message_queue);
60}
61
62void MessageQueueManager::Remove(MessageQueue *message_queue) {
63 // If there isn't a message queue manager instance, then there isn't a queue
64 // to remove.
65 if (!instance_) return;
66 return Instance()->RemoveInternal(message_queue);
67}
68void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020069#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000070 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000071#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000072 // If this is the last MessageQueue, destroy the manager as well so that
73 // we don't leak this object at program shutdown. As mentioned above, this is
74 // not thread-safe, but this should only happen at program termination (when
75 // the ThreadManager is destroyed, and threads are no longer active).
76 bool destroy = false;
77 {
78 CritScope cs(&crit_);
79 std::vector<MessageQueue *>::iterator iter;
80 iter = std::find(message_queues_.begin(), message_queues_.end(),
81 message_queue);
82 if (iter != message_queues_.end()) {
83 message_queues_.erase(iter);
84 }
85 destroy = message_queues_.empty();
86 }
87 if (destroy) {
88 instance_ = NULL;
89 delete this;
90 }
91}
92
93void MessageQueueManager::Clear(MessageHandler *handler) {
94 // If there isn't a message queue manager instance, then there aren't any
95 // queues to remove this handler from.
96 if (!instance_) return;
97 return Instance()->ClearInternal(handler);
98}
99void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +0200100#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000101 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000102#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000103 CritScope cs(&crit_);
104 std::vector<MessageQueue *>::iterator iter;
105 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
106 (*iter)->Clear(handler);
107}
108
deadbeeff5f03e82016-06-06 11:16:06 -0700109void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700110 if (!instance_) {
111 return;
112 }
deadbeeff5f03e82016-06-06 11:16:06 -0700113 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700114}
115
deadbeeff5f03e82016-06-06 11:16:06 -0700116void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700117#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
118 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
119#endif
deadbeeff5f03e82016-06-06 11:16:06 -0700120 // Post a delayed message at the current time and wait for it to be dispatched
121 // on all queues, which will ensure that all messages that came before it were
122 // also dispatched.
123 volatile int queues_not_done;
124 auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
125 FunctorMessageHandler<void, decltype(functor)> handler(functor);
126 {
127 CritScope cs(&crit_);
128 queues_not_done = static_cast<int>(message_queues_.size());
129 for (MessageQueue* queue : message_queues_) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700130 queue->PostDelayed(RTC_FROM_HERE, 0, &handler);
deadbeeff5f03e82016-06-06 11:16:06 -0700131 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700132 }
deadbeeff5f03e82016-06-06 11:16:06 -0700133 // Note: One of the message queues may have been on this thread, which is why
134 // we can't synchronously wait for queues_not_done to go to 0; we need to
135 // process messages as well.
136 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
137 rtc::Thread::Current()->ProcessMessages(0);
138 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700139}
140
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000141//------------------------------------------------------------------
142// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800143MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800144 : fStop_(false), fPeekKeep_(false),
145 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700146 RTC_DCHECK(ss);
147 // Currently, MessageQueue holds a socket server, and is the base class for
148 // Thread. It seems like it makes more sense for Thread to hold the socket
149 // server, and provide it to the MessageQueue, since the Thread controls
150 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
151 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000152 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800153 if (init_queue) {
154 DoInit();
155 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000156}
157
danilchapbebf54c2016-04-28 01:32:48 -0700158MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
159 : MessageQueue(ss.get(), init_queue) {
160 own_ss_ = std::move(ss);
161}
162
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000163MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800164 DoDestroy();
165}
166
167void MessageQueue::DoInit() {
168 if (fInitialized_) {
169 return;
170 }
171
172 fInitialized_ = true;
173 MessageQueueManager::Add(this);
174}
175
176void MessageQueue::DoDestroy() {
177 if (fDestroyed_) {
178 return;
179 }
180
181 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000182 // The signal is done from here to ensure
183 // that it always gets called when the queue
184 // is going away.
185 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000186 MessageQueueManager::Remove(this);
187 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800188
189 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000190 if (ss_) {
191 ss_->SetMessageQueue(NULL);
192 }
193}
194
jbauch9ccedc32016-02-25 01:14:56 -0800195SocketServer* MessageQueue::socketserver() {
196 SharedScope ss(&ss_lock_);
197 return ss_;
198}
199
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000200void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800201 // Need to lock exclusively here to prevent simultaneous modifications from
202 // other threads. Can't be a shared lock to prevent races with other reading
203 // threads.
204 // Other places that only read "ss_" can use a shared lock as simultaneous
205 // read access is allowed.
206 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700207 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000208 ss_->SetMessageQueue(this);
209}
210
jbauch9ccedc32016-02-25 01:14:56 -0800211void MessageQueue::WakeUpSocketServer() {
212 SharedScope ss(&ss_lock_);
213 ss_->WakeUp();
214}
215
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000216void MessageQueue::Quit() {
217 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800218 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000219}
220
221bool MessageQueue::IsQuitting() {
222 return fStop_;
223}
224
225void MessageQueue::Restart() {
226 fStop_ = false;
227}
228
229bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
230 if (fPeekKeep_) {
231 *pmsg = msgPeek_;
232 return true;
233 }
234 if (!Get(pmsg, cmsWait))
235 return false;
236 msgPeek_ = *pmsg;
237 fPeekKeep_ = true;
238 return true;
239}
240
241bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
242 // Return and clear peek if present
243 // Always return the peek if it exists so there is Peek/Get symmetry
244
245 if (fPeekKeep_) {
246 *pmsg = msgPeek_;
247 fPeekKeep_ = false;
248 return true;
249 }
250
251 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
252
Honghai Zhang82d78622016-05-06 11:29:15 -0700253 int64_t cmsTotal = cmsWait;
254 int64_t cmsElapsed = 0;
255 int64_t msStart = TimeMillis();
256 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000257 while (true) {
258 // Check for sent messages
259 ReceiveSends();
260
261 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700262 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000263 bool first_pass = true;
264 while (true) {
265 // All queue operations need to be locked, but nothing else in this loop
266 // (specifically handling disposed message) can happen inside the crit.
267 // Otherwise, disposed MessageHandlers will cause deadlocks.
268 {
269 CritScope cs(&crit_);
270 // On the first pass, check for delayed messages that have been
271 // triggered and calculate the next trigger time.
272 if (first_pass) {
273 first_pass = false;
274 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700275 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000276 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
277 break;
278 }
279 msgq_.push_back(dmsgq_.top().msg_);
280 dmsgq_.pop();
281 }
282 }
283 // Pull a message off the message queue, if available.
284 if (msgq_.empty()) {
285 break;
286 } else {
287 *pmsg = msgq_.front();
288 msgq_.pop_front();
289 }
290 } // crit_ is released here.
291
292 // Log a warning for time-sensitive messages that we're late to deliver.
293 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700294 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000295 if (delay > 0) {
296 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
297 << (delay + kMaxMsgLatency) << "ms";
298 }
299 }
300 // If this was a dispose message, delete it and skip it.
301 if (MQID_DISPOSE == pmsg->message_id) {
302 ASSERT(NULL == pmsg->phandler);
303 delete pmsg->pdata;
304 *pmsg = Message();
305 continue;
306 }
307 return true;
308 }
309
310 if (fStop_)
311 break;
312
313 // Which is shorter, the delay wait or the asked wait?
314
Honghai Zhang82d78622016-05-06 11:29:15 -0700315 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000316 if (cmsWait == kForever) {
317 cmsNext = cmsDelayNext;
318 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700319 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000320 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
321 cmsNext = cmsDelayNext;
322 }
323
jbauch9ccedc32016-02-25 01:14:56 -0800324 {
325 // Wait and multiplex in the meantime
326 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700327 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800328 return false;
329 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000330
331 // If the specified timeout expired, return
332
Honghai Zhang82d78622016-05-06 11:29:15 -0700333 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000334 cmsElapsed = TimeDiff(msCurrent, msStart);
335 if (cmsWait != kForever) {
336 if (cmsElapsed >= cmsWait)
337 return false;
338 }
339 }
340 return false;
341}
342
343void MessageQueue::ReceiveSends() {
344}
345
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700346void MessageQueue::Post(const Location& posted_from,
347 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200348 uint32_t id,
349 MessageData* pdata,
350 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000351 if (fStop_)
352 return;
353
354 // Keep thread safe
355 // Add the message to the end of the queue
356 // Signal for the multiplexer to return
357
jbauch9ccedc32016-02-25 01:14:56 -0800358 {
359 CritScope cs(&crit_);
360 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700361 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800362 msg.phandler = phandler;
363 msg.message_id = id;
364 msg.pdata = pdata;
365 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700366 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800367 }
368 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000369 }
jbauch9ccedc32016-02-25 01:14:56 -0800370 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000371}
372
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700373void MessageQueue::PostDelayed(const Location& posted_from,
374 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000375 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200376 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000377 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700378 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
379 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000380}
381
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700382void MessageQueue::PostAt(const Location& posted_from,
383 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000384 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200385 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000386 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700387 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700388 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700389 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700390}
391
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700392void MessageQueue::PostAt(const Location& posted_from,
393 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700394 MessageHandler* phandler,
395 uint32_t id,
396 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700397 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
398 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000399}
400
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700401void MessageQueue::DoDelayPost(const Location& posted_from,
402 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700403 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200404 MessageHandler* phandler,
405 uint32_t id,
406 MessageData* pdata) {
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700407 if (fStop_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000408 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700409 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000410
411 // Keep thread safe
412 // Add to the priority queue. Gets sorted soonest first.
413 // Signal for the multiplexer to return.
414
jbauch9ccedc32016-02-25 01:14:56 -0800415 {
416 CritScope cs(&crit_);
417 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700418 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800419 msg.phandler = phandler;
420 msg.message_id = id;
421 msg.pdata = pdata;
422 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
423 dmsgq_.push(dmsg);
424 // If this message queue processes 1 message every millisecond for 50 days,
425 // we will wrap this number. Even then, only messages with identical times
426 // will be misordered, and then only briefly. This is probably ok.
427 VERIFY(0 != ++dmsgq_next_num_);
428 }
429 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000430}
431
432int MessageQueue::GetDelay() {
433 CritScope cs(&crit_);
434
435 if (!msgq_.empty())
436 return 0;
437
438 if (!dmsgq_.empty()) {
439 int delay = TimeUntil(dmsgq_.top().msTrigger_);
440 if (delay < 0)
441 delay = 0;
442 return delay;
443 }
444
445 return kForever;
446}
447
Peter Boström0c4e06b2015-10-07 12:23:21 +0200448void MessageQueue::Clear(MessageHandler* phandler,
449 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000450 MessageList* removed) {
451 CritScope cs(&crit_);
452
453 // Remove messages with phandler
454
455 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
456 if (removed) {
457 removed->push_back(msgPeek_);
458 } else {
459 delete msgPeek_.pdata;
460 }
461 fPeekKeep_ = false;
462 }
463
464 // Remove from ordered message queue
465
466 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
467 if (it->Match(phandler, id)) {
468 if (removed) {
469 removed->push_back(*it);
470 } else {
471 delete it->pdata;
472 }
473 it = msgq_.erase(it);
474 } else {
475 ++it;
476 }
477 }
478
479 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000480
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000481 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
482 for (PriorityQueue::container_type::iterator it = new_end;
483 it != dmsgq_.container().end(); ++it) {
484 if (it->msg_.Match(phandler, id)) {
485 if (removed) {
486 removed->push_back(it->msg_);
487 } else {
488 delete it->msg_.pdata;
489 }
490 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000491 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000492 }
493 }
494 dmsgq_.container().erase(new_end, dmsgq_.container().end());
495 dmsgq_.reheap();
496}
497
498void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700499 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
500 pmsg->posted_from.file_and_line(), "src_func",
501 pmsg->posted_from.function_name());
502 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000503 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700504 int64_t end_time = TimeMillis();
505 int64_t diff = TimeDiff(end_time, start_time);
506 if (diff >= kSlowDispatchLoggingThreshold) {
507 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
508 << pmsg->posted_from.ToString();
509 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000510}
511
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000512} // namespace rtc