blob: a561af42eb8084a74a39bebc9a448877b6233d8f [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020012#include "rtc_base/atomicops.h"
13#include "rtc_base/checks.h"
14#include "rtc_base/logging.h"
15#include "rtc_base/messagequeue.h"
16#include "rtc_base/stringencode.h"
17#include "rtc_base/thread.h"
18#include "rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000022
Yves Gerey665174f2018-06-19 15:03:05 +020023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
danilchap3c6abd22017-09-06 05:46:29 -070026class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 02:45:40 -070027 public:
jbauch5b361732017-07-06 23:51:37 -070028 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
danilchap3c6abd22017-09-06 05:46:29 -070029 RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-06 23:51:37 -070030 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 02:45:40 -070031 cs_->Enter();
jbauch5b361732017-07-06 23:51:37 -070032 *processing_ += 1;
andrespcdf61722016-07-08 02:45:40 -070033 }
34
danilchap3c6abd22017-09-06 05:46:29 -070035 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
jbauch5b361732017-07-06 23:51:37 -070036 *processing_ -= 1;
andrespcdf61722016-07-08 02:45:40 -070037 cs_->Leave();
38 }
39
40 private:
41 const CriticalSection* const cs_;
jbauch5b361732017-07-06 23:51:37 -070042 size_t* processing_;
andrespcdf61722016-07-08 02:45:40 -070043
jbauch5b361732017-07-06 23:51:37 -070044 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 02:45:40 -070045};
46} // namespace
47
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000048//------------------------------------------------------------------
49// MessageQueueManager
50
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000051MessageQueueManager* MessageQueueManager::Instance() {
Niels Möller5e007b72018-09-07 12:35:44 +020052 static MessageQueueManager* const instance = new MessageQueueManager;
53 return instance;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000054}
55
jbauch5b361732017-07-06 23:51:37 -070056MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000057
Yves Gerey665174f2018-06-19 15:03:05 +020058MessageQueueManager::~MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000059
Yves Gerey665174f2018-06-19 15:03:05 +020060void MessageQueueManager::Add(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000061 return Instance()->AddInternal(message_queue);
62}
Yves Gerey665174f2018-06-19 15:03:05 +020063void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
jbauch5b361732017-07-06 23:51:37 -070064 CritScope cs(&crit_);
65 // Prevent changes while the list of message queues is processed.
66 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000067 message_queues_.push_back(message_queue);
68}
69
Yves Gerey665174f2018-06-19 15:03:05 +020070void MessageQueueManager::Remove(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000071 return Instance()->RemoveInternal(message_queue);
72}
Yves Gerey665174f2018-06-19 15:03:05 +020073void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000074 {
jbauch5b361732017-07-06 23:51:37 -070075 CritScope cs(&crit_);
76 // Prevent changes while the list of message queues is processed.
77 RTC_DCHECK_EQ(processing_, 0);
Yves Gerey665174f2018-06-19 15:03:05 +020078 std::vector<MessageQueue*>::iterator iter;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000079 iter = std::find(message_queues_.begin(), message_queues_.end(),
80 message_queue);
81 if (iter != message_queues_.end()) {
82 message_queues_.erase(iter);
83 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000084 }
85}
86
Yves Gerey665174f2018-06-19 15:03:05 +020087void MessageQueueManager::Clear(MessageHandler* handler) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000088 return Instance()->ClearInternal(handler);
89}
Yves Gerey665174f2018-06-19 15:03:05 +020090void MessageQueueManager::ClearInternal(MessageHandler* handler) {
jbauch5b361732017-07-06 23:51:37 -070091 // Deleted objects may cause re-entrant calls to ClearInternal. This is
92 // allowed as the list of message queues does not change while queues are
93 // cleared.
94 MarkProcessingCritScope cs(&crit_, &processing_);
jbauch5b361732017-07-06 23:51:37 -070095 for (MessageQueue* queue : message_queues_) {
96 queue->Clear(handler);
97 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000098}
99
Niels Möller8909a632018-09-06 08:42:44 +0200100void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
deadbeeff5f03e82016-06-06 11:16:06 -0700101 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700102}
103
deadbeeff5f03e82016-06-06 11:16:06 -0700104void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700105 // This works by posting a delayed message at the current time and waiting
106 // for it to be dispatched on all queues, which will ensure that all messages
107 // that came before it were also dispatched.
108 volatile int queues_not_done = 0;
109
110 // This class is used so that whether the posted message is processed, or the
111 // message queue is simply cleared, queues_not_done gets decremented.
112 class ScopedIncrement : public MessageData {
113 public:
114 ScopedIncrement(volatile int* value) : value_(value) {
115 AtomicOps::Increment(value_);
116 }
117 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
118
119 private:
120 volatile int* value_;
121 };
122
deadbeeff5f03e82016-06-06 11:16:06 -0700123 {
jbauch5b361732017-07-06 23:51:37 -0700124 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 11:16:06 -0700125 for (MessageQueue* queue : message_queues_) {
Niels Möller8909a632018-09-06 08:42:44 +0200126 if (!queue->IsProcessingMessagesForTesting()) {
pthatcher1749bc32017-02-08 13:18:00 -0800127 // If the queue is not processing messages, it can
128 // be ignored. If we tried to post a message to it, it would be dropped
129 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700130 continue;
131 }
132 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
133 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700134 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700135 }
Niels Möller8909a632018-09-06 08:42:44 +0200136
137 rtc::Thread* current = rtc::Thread::Current();
138 // Note: One of the message queues may have been on this thread, which is
139 // why we can't synchronously wait for queues_not_done to go to 0; we need
140 // to process messages as well.
Ying Wangb2940902018-09-05 09:40:40 +0000141 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
Niels Möller8909a632018-09-06 08:42:44 +0200142 if (current) {
143 current->ProcessMessages(0);
144 }
deadbeeff5f03e82016-06-06 11:16:06 -0700145 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700146}
147
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000148//------------------------------------------------------------------
149// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800150MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200151 : fPeekKeep_(false),
152 dmsgq_next_num_(0),
153 fInitialized_(false),
154 fDestroyed_(false),
155 stop_(0),
156 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700157 RTC_DCHECK(ss);
158 // Currently, MessageQueue holds a socket server, and is the base class for
159 // Thread. It seems like it makes more sense for Thread to hold the socket
160 // server, and provide it to the MessageQueue, since the Thread controls
161 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
162 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000163 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800164 if (init_queue) {
165 DoInit();
166 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000167}
168
danilchapbebf54c2016-04-28 01:32:48 -0700169MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
170 : MessageQueue(ss.get(), init_queue) {
171 own_ss_ = std::move(ss);
172}
173
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000174MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800175 DoDestroy();
176}
177
178void MessageQueue::DoInit() {
179 if (fInitialized_) {
180 return;
181 }
182
183 fInitialized_ = true;
184 MessageQueueManager::Add(this);
185}
186
187void MessageQueue::DoDestroy() {
188 if (fDestroyed_) {
189 return;
190 }
191
192 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000193 // The signal is done from here to ensure
194 // that it always gets called when the queue
195 // is going away.
196 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000197 MessageQueueManager::Remove(this);
Niels Möller5e007b72018-09-07 12:35:44 +0200198 ClearInternal(nullptr, MQID_ANY, nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800199
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000200 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800201 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000202 }
203}
204
jbauch9ccedc32016-02-25 01:14:56 -0800205SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800206 return ss_;
207}
208
jbauch9ccedc32016-02-25 01:14:56 -0800209void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800210 ss_->WakeUp();
211}
212
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000213void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200214 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800215 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000216}
217
218bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200219 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000220}
221
Niels Möller8909a632018-09-06 08:42:44 +0200222bool MessageQueue::IsProcessingMessagesForTesting() {
pthatcher1749bc32017-02-08 13:18:00 -0800223 return !IsQuitting();
224}
225
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000226void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200227 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000228}
229
Yves Gerey665174f2018-06-19 15:03:05 +0200230bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000231 if (fPeekKeep_) {
232 *pmsg = msgPeek_;
233 return true;
234 }
235 if (!Get(pmsg, cmsWait))
236 return false;
237 msgPeek_ = *pmsg;
238 fPeekKeep_ = true;
239 return true;
240}
241
Yves Gerey665174f2018-06-19 15:03:05 +0200242bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000243 // Return and clear peek if present
244 // Always return the peek if it exists so there is Peek/Get symmetry
245
246 if (fPeekKeep_) {
247 *pmsg = msgPeek_;
248 fPeekKeep_ = false;
249 return true;
250 }
251
252 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
253
Honghai Zhang82d78622016-05-06 11:29:15 -0700254 int64_t cmsTotal = cmsWait;
255 int64_t cmsElapsed = 0;
256 int64_t msStart = TimeMillis();
257 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000258 while (true) {
259 // Check for sent messages
260 ReceiveSends();
261
262 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700263 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000264 bool first_pass = true;
265 while (true) {
266 // All queue operations need to be locked, but nothing else in this loop
267 // (specifically handling disposed message) can happen inside the crit.
268 // Otherwise, disposed MessageHandlers will cause deadlocks.
269 {
270 CritScope cs(&crit_);
271 // On the first pass, check for delayed messages that have been
272 // triggered and calculate the next trigger time.
273 if (first_pass) {
274 first_pass = false;
275 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700276 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000277 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
278 break;
279 }
280 msgq_.push_back(dmsgq_.top().msg_);
281 dmsgq_.pop();
282 }
283 }
284 // Pull a message off the message queue, if available.
285 if (msgq_.empty()) {
286 break;
287 } else {
288 *pmsg = msgq_.front();
289 msgq_.pop_front();
290 }
291 } // crit_ is released here.
292
293 // Log a warning for time-sensitive messages that we're late to deliver.
294 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700295 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000296 if (delay > 0) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100297 RTC_LOG_F(LS_WARNING)
298 << "id: " << pmsg->message_id
299 << " delay: " << (delay + kMaxMsgLatency) << "ms";
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000300 }
301 }
302 // If this was a dispose message, delete it and skip it.
303 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800304 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000305 delete pmsg->pdata;
306 *pmsg = Message();
307 continue;
308 }
309 return true;
310 }
311
André Susano Pinto02a57972016-07-22 13:30:05 +0200312 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000313 break;
314
315 // Which is shorter, the delay wait or the asked wait?
316
Honghai Zhang82d78622016-05-06 11:29:15 -0700317 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000318 if (cmsWait == kForever) {
319 cmsNext = cmsDelayNext;
320 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700321 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000322 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
323 cmsNext = cmsDelayNext;
324 }
325
jbauch9ccedc32016-02-25 01:14:56 -0800326 {
327 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700328 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800329 return false;
330 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000331
332 // If the specified timeout expired, return
333
Honghai Zhang82d78622016-05-06 11:29:15 -0700334 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000335 cmsElapsed = TimeDiff(msCurrent, msStart);
336 if (cmsWait != kForever) {
337 if (cmsElapsed >= cmsWait)
338 return false;
339 }
340 }
341 return false;
342}
343
Yves Gerey665174f2018-06-19 15:03:05 +0200344void MessageQueue::ReceiveSends() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000345
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700346void MessageQueue::Post(const Location& posted_from,
347 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200348 uint32_t id,
349 MessageData* pdata,
350 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200351 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000352 return;
353
354 // Keep thread safe
355 // Add the message to the end of the queue
356 // Signal for the multiplexer to return
357
jbauch9ccedc32016-02-25 01:14:56 -0800358 {
359 CritScope cs(&crit_);
360 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700361 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800362 msg.phandler = phandler;
363 msg.message_id = id;
364 msg.pdata = pdata;
365 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700366 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800367 }
368 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000369 }
jbauch9ccedc32016-02-25 01:14:56 -0800370 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000371}
372
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700373void MessageQueue::PostDelayed(const Location& posted_from,
374 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000375 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200376 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000377 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700378 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
379 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000380}
381
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700382void MessageQueue::PostAt(const Location& posted_from,
383 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000384 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200385 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000386 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700387 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700388 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700389 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700390}
391
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700392void MessageQueue::PostAt(const Location& posted_from,
393 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700394 MessageHandler* phandler,
395 uint32_t id,
396 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700397 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
398 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000399}
400
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700401void MessageQueue::DoDelayPost(const Location& posted_from,
402 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700403 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200404 MessageHandler* phandler,
405 uint32_t id,
406 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200407 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000408 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700409 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000410
411 // Keep thread safe
412 // Add to the priority queue. Gets sorted soonest first.
413 // Signal for the multiplexer to return.
414
jbauch9ccedc32016-02-25 01:14:56 -0800415 {
416 CritScope cs(&crit_);
417 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700418 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800419 msg.phandler = phandler;
420 msg.message_id = id;
421 msg.pdata = pdata;
422 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
423 dmsgq_.push(dmsg);
424 // If this message queue processes 1 message every millisecond for 50 days,
425 // we will wrap this number. Even then, only messages with identical times
426 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800427 ++dmsgq_next_num_;
428 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800429 }
430 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000431}
432
433int MessageQueue::GetDelay() {
434 CritScope cs(&crit_);
435
436 if (!msgq_.empty())
437 return 0;
438
439 if (!dmsgq_.empty()) {
440 int delay = TimeUntil(dmsgq_.top().msTrigger_);
441 if (delay < 0)
442 delay = 0;
443 return delay;
444 }
445
446 return kForever;
447}
448
Peter Boström0c4e06b2015-10-07 12:23:21 +0200449void MessageQueue::Clear(MessageHandler* phandler,
450 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000451 MessageList* removed) {
452 CritScope cs(&crit_);
Niels Möller5e007b72018-09-07 12:35:44 +0200453 ClearInternal(phandler, id, removed);
454}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000455
Niels Möller5e007b72018-09-07 12:35:44 +0200456void MessageQueue::ClearInternal(MessageHandler* phandler,
457 uint32_t id,
458 MessageList* removed) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000459 // Remove messages with phandler
460
461 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
462 if (removed) {
463 removed->push_back(msgPeek_);
464 } else {
465 delete msgPeek_.pdata;
466 }
467 fPeekKeep_ = false;
468 }
469
470 // Remove from ordered message queue
471
472 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
473 if (it->Match(phandler, id)) {
474 if (removed) {
475 removed->push_back(*it);
476 } else {
477 delete it->pdata;
478 }
479 it = msgq_.erase(it);
480 } else {
481 ++it;
482 }
483 }
484
485 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000486
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000487 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
488 for (PriorityQueue::container_type::iterator it = new_end;
489 it != dmsgq_.container().end(); ++it) {
490 if (it->msg_.Match(phandler, id)) {
491 if (removed) {
492 removed->push_back(it->msg_);
493 } else {
494 delete it->msg_.pdata;
495 }
496 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000497 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000498 }
499 }
500 dmsgq_.container().erase(new_end, dmsgq_.container().end());
501 dmsgq_.reheap();
502}
503
Yves Gerey665174f2018-06-19 15:03:05 +0200504void MessageQueue::Dispatch(Message* pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700505 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
506 pmsg->posted_from.file_and_line(), "src_func",
507 pmsg->posted_from.function_name());
508 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000509 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700510 int64_t end_time = TimeMillis();
511 int64_t diff = TimeDiff(end_time, start_time);
512 if (diff >= kSlowDispatchLoggingThreshold) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100513 RTC_LOG(LS_INFO) << "Message took " << diff
514 << "ms to dispatch. Posted from: "
515 << pmsg->posted_from.ToString();
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700516 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000517}
518
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000519} // namespace rtc