blob: 84d3a96eb84e98b0920bb43f721adc45606e1486 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
Yves Gerey2e00abc2018-10-05 15:39:24 +020011#include <utility> // for move
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000012
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020013#include "rtc_base/atomicops.h"
14#include "rtc_base/checks.h"
15#include "rtc_base/logging.h"
16#include "rtc_base/messagequeue.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020017#include "rtc_base/thread.h"
Yves Gerey2e00abc2018-10-05 15:39:24 +020018#include "rtc_base/timeutils.h" // for TimeMillis, TimeDiff, TimeUntil
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020019#include "rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000020
21namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070022namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023
Yves Gerey665174f2018-06-19 15:03:05 +020024const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070025const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000026
danilchap3c6abd22017-09-06 05:46:29 -070027class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 02:45:40 -070028 public:
jbauch5b361732017-07-06 23:51:37 -070029 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
danilchap3c6abd22017-09-06 05:46:29 -070030 RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-06 23:51:37 -070031 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 02:45:40 -070032 cs_->Enter();
jbauch5b361732017-07-06 23:51:37 -070033 *processing_ += 1;
andrespcdf61722016-07-08 02:45:40 -070034 }
35
danilchap3c6abd22017-09-06 05:46:29 -070036 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
jbauch5b361732017-07-06 23:51:37 -070037 *processing_ -= 1;
andrespcdf61722016-07-08 02:45:40 -070038 cs_->Leave();
39 }
40
41 private:
42 const CriticalSection* const cs_;
jbauch5b361732017-07-06 23:51:37 -070043 size_t* processing_;
andrespcdf61722016-07-08 02:45:40 -070044
jbauch5b361732017-07-06 23:51:37 -070045 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 02:45:40 -070046};
47} // namespace
48
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000049//------------------------------------------------------------------
50// MessageQueueManager
51
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000052MessageQueueManager* MessageQueueManager::Instance() {
Niels Möller5e007b72018-09-07 12:35:44 +020053 static MessageQueueManager* const instance = new MessageQueueManager;
54 return instance;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000055}
56
jbauch5b361732017-07-06 23:51:37 -070057MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000058
Yves Gerey665174f2018-06-19 15:03:05 +020059MessageQueueManager::~MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000060
Yves Gerey665174f2018-06-19 15:03:05 +020061void MessageQueueManager::Add(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000062 return Instance()->AddInternal(message_queue);
63}
Yves Gerey665174f2018-06-19 15:03:05 +020064void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
jbauch5b361732017-07-06 23:51:37 -070065 CritScope cs(&crit_);
66 // Prevent changes while the list of message queues is processed.
67 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000068 message_queues_.push_back(message_queue);
69}
70
Yves Gerey665174f2018-06-19 15:03:05 +020071void MessageQueueManager::Remove(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000072 return Instance()->RemoveInternal(message_queue);
73}
Yves Gerey665174f2018-06-19 15:03:05 +020074void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000075 {
jbauch5b361732017-07-06 23:51:37 -070076 CritScope cs(&crit_);
77 // Prevent changes while the list of message queues is processed.
78 RTC_DCHECK_EQ(processing_, 0);
Yves Gerey665174f2018-06-19 15:03:05 +020079 std::vector<MessageQueue*>::iterator iter;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000080 iter = std::find(message_queues_.begin(), message_queues_.end(),
81 message_queue);
82 if (iter != message_queues_.end()) {
83 message_queues_.erase(iter);
84 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000085 }
86}
87
Yves Gerey665174f2018-06-19 15:03:05 +020088void MessageQueueManager::Clear(MessageHandler* handler) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000089 return Instance()->ClearInternal(handler);
90}
Yves Gerey665174f2018-06-19 15:03:05 +020091void MessageQueueManager::ClearInternal(MessageHandler* handler) {
jbauch5b361732017-07-06 23:51:37 -070092 // Deleted objects may cause re-entrant calls to ClearInternal. This is
93 // allowed as the list of message queues does not change while queues are
94 // cleared.
95 MarkProcessingCritScope cs(&crit_, &processing_);
jbauch5b361732017-07-06 23:51:37 -070096 for (MessageQueue* queue : message_queues_) {
97 queue->Clear(handler);
98 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000099}
100
Niels Möller8909a632018-09-06 08:42:44 +0200101void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
deadbeeff5f03e82016-06-06 11:16:06 -0700102 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700103}
104
deadbeeff5f03e82016-06-06 11:16:06 -0700105void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700106 // This works by posting a delayed message at the current time and waiting
107 // for it to be dispatched on all queues, which will ensure that all messages
108 // that came before it were also dispatched.
109 volatile int queues_not_done = 0;
110
111 // This class is used so that whether the posted message is processed, or the
112 // message queue is simply cleared, queues_not_done gets decremented.
113 class ScopedIncrement : public MessageData {
114 public:
115 ScopedIncrement(volatile int* value) : value_(value) {
116 AtomicOps::Increment(value_);
117 }
118 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
119
120 private:
121 volatile int* value_;
122 };
123
deadbeeff5f03e82016-06-06 11:16:06 -0700124 {
jbauch5b361732017-07-06 23:51:37 -0700125 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 11:16:06 -0700126 for (MessageQueue* queue : message_queues_) {
Niels Möller8909a632018-09-06 08:42:44 +0200127 if (!queue->IsProcessingMessagesForTesting()) {
pthatcher1749bc32017-02-08 13:18:00 -0800128 // If the queue is not processing messages, it can
129 // be ignored. If we tried to post a message to it, it would be dropped
130 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700131 continue;
132 }
133 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
134 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700135 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700136 }
Niels Möller8909a632018-09-06 08:42:44 +0200137
138 rtc::Thread* current = rtc::Thread::Current();
139 // Note: One of the message queues may have been on this thread, which is
140 // why we can't synchronously wait for queues_not_done to go to 0; we need
141 // to process messages as well.
Ying Wangb2940902018-09-05 09:40:40 +0000142 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
Niels Möller8909a632018-09-06 08:42:44 +0200143 if (current) {
144 current->ProcessMessages(0);
145 }
deadbeeff5f03e82016-06-06 11:16:06 -0700146 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700147}
148
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000149//------------------------------------------------------------------
150// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800151MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200152 : fPeekKeep_(false),
153 dmsgq_next_num_(0),
154 fInitialized_(false),
155 fDestroyed_(false),
156 stop_(0),
157 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700158 RTC_DCHECK(ss);
159 // Currently, MessageQueue holds a socket server, and is the base class for
160 // Thread. It seems like it makes more sense for Thread to hold the socket
161 // server, and provide it to the MessageQueue, since the Thread controls
162 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
163 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000164 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800165 if (init_queue) {
166 DoInit();
167 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000168}
169
danilchapbebf54c2016-04-28 01:32:48 -0700170MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
171 : MessageQueue(ss.get(), init_queue) {
172 own_ss_ = std::move(ss);
173}
174
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000175MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800176 DoDestroy();
177}
178
179void MessageQueue::DoInit() {
180 if (fInitialized_) {
181 return;
182 }
183
184 fInitialized_ = true;
185 MessageQueueManager::Add(this);
186}
187
188void MessageQueue::DoDestroy() {
189 if (fDestroyed_) {
190 return;
191 }
192
193 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000194 // The signal is done from here to ensure
195 // that it always gets called when the queue
196 // is going away.
197 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000198 MessageQueueManager::Remove(this);
Niels Möller5e007b72018-09-07 12:35:44 +0200199 ClearInternal(nullptr, MQID_ANY, nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800200
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000201 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800202 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000203 }
204}
205
jbauch9ccedc32016-02-25 01:14:56 -0800206SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800207 return ss_;
208}
209
jbauch9ccedc32016-02-25 01:14:56 -0800210void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800211 ss_->WakeUp();
212}
213
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000214void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200215 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800216 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000217}
218
219bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200220 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000221}
222
Niels Möller8909a632018-09-06 08:42:44 +0200223bool MessageQueue::IsProcessingMessagesForTesting() {
pthatcher1749bc32017-02-08 13:18:00 -0800224 return !IsQuitting();
225}
226
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000227void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200228 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000229}
230
Yves Gerey665174f2018-06-19 15:03:05 +0200231bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000232 if (fPeekKeep_) {
233 *pmsg = msgPeek_;
234 return true;
235 }
236 if (!Get(pmsg, cmsWait))
237 return false;
238 msgPeek_ = *pmsg;
239 fPeekKeep_ = true;
240 return true;
241}
242
Yves Gerey665174f2018-06-19 15:03:05 +0200243bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000244 // Return and clear peek if present
245 // Always return the peek if it exists so there is Peek/Get symmetry
246
247 if (fPeekKeep_) {
248 *pmsg = msgPeek_;
249 fPeekKeep_ = false;
250 return true;
251 }
252
253 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
254
Honghai Zhang82d78622016-05-06 11:29:15 -0700255 int64_t cmsTotal = cmsWait;
256 int64_t cmsElapsed = 0;
257 int64_t msStart = TimeMillis();
258 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000259 while (true) {
260 // Check for sent messages
261 ReceiveSends();
262
263 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700264 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000265 bool first_pass = true;
266 while (true) {
267 // All queue operations need to be locked, but nothing else in this loop
268 // (specifically handling disposed message) can happen inside the crit.
269 // Otherwise, disposed MessageHandlers will cause deadlocks.
270 {
271 CritScope cs(&crit_);
272 // On the first pass, check for delayed messages that have been
273 // triggered and calculate the next trigger time.
274 if (first_pass) {
275 first_pass = false;
276 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700277 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000278 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
279 break;
280 }
281 msgq_.push_back(dmsgq_.top().msg_);
282 dmsgq_.pop();
283 }
284 }
285 // Pull a message off the message queue, if available.
286 if (msgq_.empty()) {
287 break;
288 } else {
289 *pmsg = msgq_.front();
290 msgq_.pop_front();
291 }
292 } // crit_ is released here.
293
294 // Log a warning for time-sensitive messages that we're late to deliver.
295 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700296 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000297 if (delay > 0) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100298 RTC_LOG_F(LS_WARNING)
299 << "id: " << pmsg->message_id
300 << " delay: " << (delay + kMaxMsgLatency) << "ms";
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000301 }
302 }
303 // If this was a dispose message, delete it and skip it.
304 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800305 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000306 delete pmsg->pdata;
307 *pmsg = Message();
308 continue;
309 }
310 return true;
311 }
312
André Susano Pinto02a57972016-07-22 13:30:05 +0200313 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000314 break;
315
316 // Which is shorter, the delay wait or the asked wait?
317
Honghai Zhang82d78622016-05-06 11:29:15 -0700318 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000319 if (cmsWait == kForever) {
320 cmsNext = cmsDelayNext;
321 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700322 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000323 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
324 cmsNext = cmsDelayNext;
325 }
326
jbauch9ccedc32016-02-25 01:14:56 -0800327 {
328 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700329 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800330 return false;
331 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000332
333 // If the specified timeout expired, return
334
Honghai Zhang82d78622016-05-06 11:29:15 -0700335 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000336 cmsElapsed = TimeDiff(msCurrent, msStart);
337 if (cmsWait != kForever) {
338 if (cmsElapsed >= cmsWait)
339 return false;
340 }
341 }
342 return false;
343}
344
Yves Gerey665174f2018-06-19 15:03:05 +0200345void MessageQueue::ReceiveSends() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000346
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700347void MessageQueue::Post(const Location& posted_from,
348 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200349 uint32_t id,
350 MessageData* pdata,
351 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200352 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000353 return;
354
355 // Keep thread safe
356 // Add the message to the end of the queue
357 // Signal for the multiplexer to return
358
jbauch9ccedc32016-02-25 01:14:56 -0800359 {
360 CritScope cs(&crit_);
361 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700362 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800363 msg.phandler = phandler;
364 msg.message_id = id;
365 msg.pdata = pdata;
366 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700367 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800368 }
369 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000370 }
jbauch9ccedc32016-02-25 01:14:56 -0800371 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000372}
373
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700374void MessageQueue::PostDelayed(const Location& posted_from,
375 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000376 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200377 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000378 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700379 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
380 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000381}
382
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700383void MessageQueue::PostAt(const Location& posted_from,
384 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000385 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200386 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000387 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700388 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700389 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700390 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700391}
392
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700393void MessageQueue::PostAt(const Location& posted_from,
394 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700395 MessageHandler* phandler,
396 uint32_t id,
397 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700398 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
399 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000400}
401
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700402void MessageQueue::DoDelayPost(const Location& posted_from,
403 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700404 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200405 MessageHandler* phandler,
406 uint32_t id,
407 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200408 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000409 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700410 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000411
412 // Keep thread safe
413 // Add to the priority queue. Gets sorted soonest first.
414 // Signal for the multiplexer to return.
415
jbauch9ccedc32016-02-25 01:14:56 -0800416 {
417 CritScope cs(&crit_);
418 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700419 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800420 msg.phandler = phandler;
421 msg.message_id = id;
422 msg.pdata = pdata;
423 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
424 dmsgq_.push(dmsg);
425 // If this message queue processes 1 message every millisecond for 50 days,
426 // we will wrap this number. Even then, only messages with identical times
427 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800428 ++dmsgq_next_num_;
429 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800430 }
431 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000432}
433
434int MessageQueue::GetDelay() {
435 CritScope cs(&crit_);
436
437 if (!msgq_.empty())
438 return 0;
439
440 if (!dmsgq_.empty()) {
441 int delay = TimeUntil(dmsgq_.top().msTrigger_);
442 if (delay < 0)
443 delay = 0;
444 return delay;
445 }
446
447 return kForever;
448}
449
Peter Boström0c4e06b2015-10-07 12:23:21 +0200450void MessageQueue::Clear(MessageHandler* phandler,
451 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000452 MessageList* removed) {
453 CritScope cs(&crit_);
Niels Möller5e007b72018-09-07 12:35:44 +0200454 ClearInternal(phandler, id, removed);
455}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000456
Niels Möller5e007b72018-09-07 12:35:44 +0200457void MessageQueue::ClearInternal(MessageHandler* phandler,
458 uint32_t id,
459 MessageList* removed) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000460 // Remove messages with phandler
461
462 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
463 if (removed) {
464 removed->push_back(msgPeek_);
465 } else {
466 delete msgPeek_.pdata;
467 }
468 fPeekKeep_ = false;
469 }
470
471 // Remove from ordered message queue
472
473 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
474 if (it->Match(phandler, id)) {
475 if (removed) {
476 removed->push_back(*it);
477 } else {
478 delete it->pdata;
479 }
480 it = msgq_.erase(it);
481 } else {
482 ++it;
483 }
484 }
485
486 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000487
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000488 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
489 for (PriorityQueue::container_type::iterator it = new_end;
490 it != dmsgq_.container().end(); ++it) {
491 if (it->msg_.Match(phandler, id)) {
492 if (removed) {
493 removed->push_back(it->msg_);
494 } else {
495 delete it->msg_.pdata;
496 }
497 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000498 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000499 }
500 }
501 dmsgq_.container().erase(new_end, dmsgq_.container().end());
502 dmsgq_.reheap();
503}
504
Yves Gerey665174f2018-06-19 15:03:05 +0200505void MessageQueue::Dispatch(Message* pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700506 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
507 pmsg->posted_from.file_and_line(), "src_func",
508 pmsg->posted_from.function_name());
509 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000510 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700511 int64_t end_time = TimeMillis();
512 int64_t diff = TimeDiff(end_time, start_time);
513 if (diff >= kSlowDispatchLoggingThreshold) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100514 RTC_LOG(LS_INFO) << "Message took " << diff
515 << "ms to dispatch. Posted from: "
516 << pmsg->posted_from.ToString();
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700517 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000518}
519
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000520} // namespace rtc