blob: fac7609cbf23af709f53c7b061d626c4b0fd266b [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
kjellandere96c45b2017-06-30 10:45:21 -070012#include "webrtc/rtc_base/atomicops.h"
13#include "webrtc/rtc_base/checks.h"
14#include "webrtc/rtc_base/logging.h"
15#include "webrtc/rtc_base/messagequeue.h"
16#include "webrtc/rtc_base/stringencode.h"
17#include "webrtc/rtc_base/thread.h"
18#include "webrtc/rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000022
Honghai Zhang82d78622016-05-06 11:29:15 -070023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
jbauch5b361732017-07-06 23:51:37 -070026class SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 02:45:40 -070027 public:
jbauch5b361732017-07-06 23:51:37 -070028 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
andrespcdf61722016-07-08 02:45:40 -070029 EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-06 23:51:37 -070030 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 02:45:40 -070031 cs_->Enter();
jbauch5b361732017-07-06 23:51:37 -070032 *processing_ += 1;
andrespcdf61722016-07-08 02:45:40 -070033 }
34
jbauch5b361732017-07-06 23:51:37 -070035 ~MarkProcessingCritScope() UNLOCK_FUNCTION() {
36 *processing_ -= 1;
andrespcdf61722016-07-08 02:45:40 -070037 cs_->Leave();
38 }
39
40 private:
41 const CriticalSection* const cs_;
jbauch5b361732017-07-06 23:51:37 -070042 size_t* processing_;
andrespcdf61722016-07-08 02:45:40 -070043
jbauch5b361732017-07-06 23:51:37 -070044 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 02:45:40 -070045};
nissebc8feda2017-06-29 06:21:20 -070046
47class FunctorPostMessageHandler : public MessageHandler {
48 public:
49 void OnMessage(Message* msg) override {
50 RunnableData* data = static_cast<RunnableData*>(msg->pdata);
51 data->Run();
52 delete data;
53 }
54};
55
andrespcdf61722016-07-08 02:45:40 -070056} // namespace
57
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000058//------------------------------------------------------------------
59// MessageQueueManager
60
deadbeef37f5ecf2017-02-27 14:06:41 -080061MessageQueueManager* MessageQueueManager::instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000062
63MessageQueueManager* MessageQueueManager::Instance() {
64 // Note: This is not thread safe, but it is first called before threads are
65 // spawned.
66 if (!instance_)
67 instance_ = new MessageQueueManager;
68 return instance_;
69}
70
71bool MessageQueueManager::IsInitialized() {
deadbeef37f5ecf2017-02-27 14:06:41 -080072 return instance_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000073}
74
jbauch5b361732017-07-06 23:51:37 -070075MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076
77MessageQueueManager::~MessageQueueManager() {
78}
79
80void MessageQueueManager::Add(MessageQueue *message_queue) {
81 return Instance()->AddInternal(message_queue);
82}
83void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
jbauch5b361732017-07-06 23:51:37 -070084 CritScope cs(&crit_);
85 // Prevent changes while the list of message queues is processed.
86 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000087 message_queues_.push_back(message_queue);
88}
89
90void MessageQueueManager::Remove(MessageQueue *message_queue) {
91 // If there isn't a message queue manager instance, then there isn't a queue
92 // to remove.
93 if (!instance_) return;
94 return Instance()->RemoveInternal(message_queue);
95}
96void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000097 // If this is the last MessageQueue, destroy the manager as well so that
98 // we don't leak this object at program shutdown. As mentioned above, this is
99 // not thread-safe, but this should only happen at program termination (when
100 // the ThreadManager is destroyed, and threads are no longer active).
101 bool destroy = false;
102 {
jbauch5b361732017-07-06 23:51:37 -0700103 CritScope cs(&crit_);
104 // Prevent changes while the list of message queues is processed.
105 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000106 std::vector<MessageQueue *>::iterator iter;
107 iter = std::find(message_queues_.begin(), message_queues_.end(),
108 message_queue);
109 if (iter != message_queues_.end()) {
110 message_queues_.erase(iter);
111 }
112 destroy = message_queues_.empty();
113 }
114 if (destroy) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800115 instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000116 delete this;
117 }
118}
119
120void MessageQueueManager::Clear(MessageHandler *handler) {
121 // If there isn't a message queue manager instance, then there aren't any
122 // queues to remove this handler from.
123 if (!instance_) return;
124 return Instance()->ClearInternal(handler);
125}
126void MessageQueueManager::ClearInternal(MessageHandler *handler) {
jbauch5b361732017-07-06 23:51:37 -0700127 // Deleted objects may cause re-entrant calls to ClearInternal. This is
128 // allowed as the list of message queues does not change while queues are
129 // cleared.
130 MarkProcessingCritScope cs(&crit_, &processing_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000131 std::vector<MessageQueue *>::iterator iter;
jbauch5b361732017-07-06 23:51:37 -0700132 for (MessageQueue* queue : message_queues_) {
133 queue->Clear(handler);
134 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000135}
136
deadbeeff5f03e82016-06-06 11:16:06 -0700137void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700138 if (!instance_) {
139 return;
140 }
deadbeeff5f03e82016-06-06 11:16:06 -0700141 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700142}
143
deadbeeff5f03e82016-06-06 11:16:06 -0700144void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700145 // This works by posting a delayed message at the current time and waiting
146 // for it to be dispatched on all queues, which will ensure that all messages
147 // that came before it were also dispatched.
148 volatile int queues_not_done = 0;
149
150 // This class is used so that whether the posted message is processed, or the
151 // message queue is simply cleared, queues_not_done gets decremented.
152 class ScopedIncrement : public MessageData {
153 public:
154 ScopedIncrement(volatile int* value) : value_(value) {
155 AtomicOps::Increment(value_);
156 }
157 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
158
159 private:
160 volatile int* value_;
161 };
162
deadbeeff5f03e82016-06-06 11:16:06 -0700163 {
jbauch5b361732017-07-06 23:51:37 -0700164 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 11:16:06 -0700165 for (MessageQueue* queue : message_queues_) {
pthatcher1749bc32017-02-08 13:18:00 -0800166 if (!queue->IsProcessingMessages()) {
167 // If the queue is not processing messages, it can
168 // be ignored. If we tried to post a message to it, it would be dropped
169 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700170 continue;
171 }
172 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
173 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700174 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700175 }
deadbeeff5f03e82016-06-06 11:16:06 -0700176 // Note: One of the message queues may have been on this thread, which is why
177 // we can't synchronously wait for queues_not_done to go to 0; we need to
178 // process messages as well.
179 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
180 rtc::Thread::Current()->ProcessMessages(0);
181 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700182}
183
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000184//------------------------------------------------------------------
185// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800186MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200187 : fPeekKeep_(false),
188 dmsgq_next_num_(0),
189 fInitialized_(false),
190 fDestroyed_(false),
191 stop_(0),
192 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700193 RTC_DCHECK(ss);
194 // Currently, MessageQueue holds a socket server, and is the base class for
195 // Thread. It seems like it makes more sense for Thread to hold the socket
196 // server, and provide it to the MessageQueue, since the Thread controls
197 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
198 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000199 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800200 if (init_queue) {
201 DoInit();
202 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000203}
204
danilchapbebf54c2016-04-28 01:32:48 -0700205MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
206 : MessageQueue(ss.get(), init_queue) {
207 own_ss_ = std::move(ss);
208}
209
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000210MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800211 DoDestroy();
212}
213
214void MessageQueue::DoInit() {
215 if (fInitialized_) {
216 return;
217 }
218
219 fInitialized_ = true;
220 MessageQueueManager::Add(this);
221}
222
223void MessageQueue::DoDestroy() {
224 if (fDestroyed_) {
225 return;
226 }
227
228 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000229 // The signal is done from here to ensure
230 // that it always gets called when the queue
231 // is going away.
232 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000233 MessageQueueManager::Remove(this);
deadbeef37f5ecf2017-02-27 14:06:41 -0800234 Clear(nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800235
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000236 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800237 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000238 }
239}
240
jbauch9ccedc32016-02-25 01:14:56 -0800241SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800242 return ss_;
243}
244
jbauch9ccedc32016-02-25 01:14:56 -0800245void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800246 ss_->WakeUp();
247}
248
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000249void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200250 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800251 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000252}
253
254bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200255 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000256}
257
pthatcher1749bc32017-02-08 13:18:00 -0800258bool MessageQueue::IsProcessingMessages() {
259 return !IsQuitting();
260}
261
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000262void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200263 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000264}
265
266bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
267 if (fPeekKeep_) {
268 *pmsg = msgPeek_;
269 return true;
270 }
271 if (!Get(pmsg, cmsWait))
272 return false;
273 msgPeek_ = *pmsg;
274 fPeekKeep_ = true;
275 return true;
276}
277
278bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
279 // Return and clear peek if present
280 // Always return the peek if it exists so there is Peek/Get symmetry
281
282 if (fPeekKeep_) {
283 *pmsg = msgPeek_;
284 fPeekKeep_ = false;
285 return true;
286 }
287
288 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
289
Honghai Zhang82d78622016-05-06 11:29:15 -0700290 int64_t cmsTotal = cmsWait;
291 int64_t cmsElapsed = 0;
292 int64_t msStart = TimeMillis();
293 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000294 while (true) {
295 // Check for sent messages
296 ReceiveSends();
297
298 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700299 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000300 bool first_pass = true;
301 while (true) {
302 // All queue operations need to be locked, but nothing else in this loop
303 // (specifically handling disposed message) can happen inside the crit.
304 // Otherwise, disposed MessageHandlers will cause deadlocks.
305 {
306 CritScope cs(&crit_);
307 // On the first pass, check for delayed messages that have been
308 // triggered and calculate the next trigger time.
309 if (first_pass) {
310 first_pass = false;
311 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700312 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000313 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
314 break;
315 }
316 msgq_.push_back(dmsgq_.top().msg_);
317 dmsgq_.pop();
318 }
319 }
320 // Pull a message off the message queue, if available.
321 if (msgq_.empty()) {
322 break;
323 } else {
324 *pmsg = msgq_.front();
325 msgq_.pop_front();
326 }
327 } // crit_ is released here.
328
329 // Log a warning for time-sensitive messages that we're late to deliver.
330 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700331 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000332 if (delay > 0) {
333 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
334 << (delay + kMaxMsgLatency) << "ms";
335 }
336 }
337 // If this was a dispose message, delete it and skip it.
338 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800339 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000340 delete pmsg->pdata;
341 *pmsg = Message();
342 continue;
343 }
344 return true;
345 }
346
André Susano Pinto02a57972016-07-22 13:30:05 +0200347 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000348 break;
349
350 // Which is shorter, the delay wait or the asked wait?
351
Honghai Zhang82d78622016-05-06 11:29:15 -0700352 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000353 if (cmsWait == kForever) {
354 cmsNext = cmsDelayNext;
355 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700356 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000357 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
358 cmsNext = cmsDelayNext;
359 }
360
jbauch9ccedc32016-02-25 01:14:56 -0800361 {
362 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700363 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800364 return false;
365 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000366
367 // If the specified timeout expired, return
368
Honghai Zhang82d78622016-05-06 11:29:15 -0700369 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000370 cmsElapsed = TimeDiff(msCurrent, msStart);
371 if (cmsWait != kForever) {
372 if (cmsElapsed >= cmsWait)
373 return false;
374 }
375 }
376 return false;
377}
378
379void MessageQueue::ReceiveSends() {
380}
381
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700382void MessageQueue::Post(const Location& posted_from,
383 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200384 uint32_t id,
385 MessageData* pdata,
386 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200387 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000388 return;
389
390 // Keep thread safe
391 // Add the message to the end of the queue
392 // Signal for the multiplexer to return
393
jbauch9ccedc32016-02-25 01:14:56 -0800394 {
395 CritScope cs(&crit_);
396 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700397 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800398 msg.phandler = phandler;
399 msg.message_id = id;
400 msg.pdata = pdata;
401 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700402 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800403 }
404 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000405 }
jbauch9ccedc32016-02-25 01:14:56 -0800406 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000407}
408
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700409void MessageQueue::PostDelayed(const Location& posted_from,
410 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000411 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200412 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000413 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700414 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
415 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000416}
417
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700418void MessageQueue::PostAt(const Location& posted_from,
419 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000420 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200421 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000422 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700423 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700424 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700425 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700426}
427
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700428void MessageQueue::PostAt(const Location& posted_from,
429 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700430 MessageHandler* phandler,
431 uint32_t id,
432 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700433 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
434 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000435}
436
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700437void MessageQueue::DoDelayPost(const Location& posted_from,
438 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700439 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200440 MessageHandler* phandler,
441 uint32_t id,
442 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200443 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000444 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700445 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000446
447 // Keep thread safe
448 // Add to the priority queue. Gets sorted soonest first.
449 // Signal for the multiplexer to return.
450
jbauch9ccedc32016-02-25 01:14:56 -0800451 {
452 CritScope cs(&crit_);
453 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700454 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800455 msg.phandler = phandler;
456 msg.message_id = id;
457 msg.pdata = pdata;
458 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
459 dmsgq_.push(dmsg);
460 // If this message queue processes 1 message every millisecond for 50 days,
461 // we will wrap this number. Even then, only messages with identical times
462 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800463 ++dmsgq_next_num_;
464 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800465 }
466 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000467}
468
469int MessageQueue::GetDelay() {
470 CritScope cs(&crit_);
471
472 if (!msgq_.empty())
473 return 0;
474
475 if (!dmsgq_.empty()) {
476 int delay = TimeUntil(dmsgq_.top().msTrigger_);
477 if (delay < 0)
478 delay = 0;
479 return delay;
480 }
481
482 return kForever;
483}
484
Peter Boström0c4e06b2015-10-07 12:23:21 +0200485void MessageQueue::Clear(MessageHandler* phandler,
486 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000487 MessageList* removed) {
488 CritScope cs(&crit_);
489
490 // Remove messages with phandler
491
492 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
493 if (removed) {
494 removed->push_back(msgPeek_);
495 } else {
496 delete msgPeek_.pdata;
497 }
498 fPeekKeep_ = false;
499 }
500
501 // Remove from ordered message queue
502
503 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
504 if (it->Match(phandler, id)) {
505 if (removed) {
506 removed->push_back(*it);
507 } else {
508 delete it->pdata;
509 }
510 it = msgq_.erase(it);
511 } else {
512 ++it;
513 }
514 }
515
516 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000517
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000518 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
519 for (PriorityQueue::container_type::iterator it = new_end;
520 it != dmsgq_.container().end(); ++it) {
521 if (it->msg_.Match(phandler, id)) {
522 if (removed) {
523 removed->push_back(it->msg_);
524 } else {
525 delete it->msg_.pdata;
526 }
527 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000528 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000529 }
530 }
531 dmsgq_.container().erase(new_end, dmsgq_.container().end());
532 dmsgq_.reheap();
533}
534
535void MessageQueue::Dispatch(Message *pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700536 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
537 pmsg->posted_from.file_and_line(), "src_func",
538 pmsg->posted_from.function_name());
539 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000540 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700541 int64_t end_time = TimeMillis();
542 int64_t diff = TimeDiff(end_time, start_time);
543 if (diff >= kSlowDispatchLoggingThreshold) {
544 LOG(LS_INFO) << "Message took " << diff << "ms to dispatch. Posted from: "
545 << pmsg->posted_from.ToString();
546 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000547}
548
nissebc8feda2017-06-29 06:21:20 -0700549void MessageQueue::PostFunctorInternal(const Location& posted_from,
550 RunnableData* message_data) {
551 // Use static to ensure it outlives this scope. Safe since
552 // FunctorPostMessageHandler keeps no state.
553 static FunctorPostMessageHandler handler;
554 Post(posted_from, &handler, 0, message_data);
555}
556
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000557} // namespace rtc