blob: 6ff73b5d65739d868b19d63f9b4ad6c2dda11800 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020012#include "rtc_base/atomicops.h"
13#include "rtc_base/checks.h"
14#include "rtc_base/logging.h"
15#include "rtc_base/messagequeue.h"
16#include "rtc_base/stringencode.h"
17#include "rtc_base/thread.h"
18#include "rtc_base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
andrespcdf61722016-07-08 02:45:40 -070021namespace {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000022
Yves Gerey665174f2018-06-19 15:03:05 +020023const int kMaxMsgLatency = 150; // 150 ms
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -070024const int kSlowDispatchLoggingThreshold = 50; // 50 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000025
danilchap3c6abd22017-09-06 05:46:29 -070026class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
andrespcdf61722016-07-08 02:45:40 -070027 public:
jbauch5b361732017-07-06 23:51:37 -070028 MarkProcessingCritScope(const CriticalSection* cs, size_t* processing)
danilchap3c6abd22017-09-06 05:46:29 -070029 RTC_EXCLUSIVE_LOCK_FUNCTION(cs)
jbauch5b361732017-07-06 23:51:37 -070030 : cs_(cs), processing_(processing) {
andrespcdf61722016-07-08 02:45:40 -070031 cs_->Enter();
jbauch5b361732017-07-06 23:51:37 -070032 *processing_ += 1;
andrespcdf61722016-07-08 02:45:40 -070033 }
34
danilchap3c6abd22017-09-06 05:46:29 -070035 ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() {
jbauch5b361732017-07-06 23:51:37 -070036 *processing_ -= 1;
andrespcdf61722016-07-08 02:45:40 -070037 cs_->Leave();
38 }
39
40 private:
41 const CriticalSection* const cs_;
jbauch5b361732017-07-06 23:51:37 -070042 size_t* processing_;
andrespcdf61722016-07-08 02:45:40 -070043
jbauch5b361732017-07-06 23:51:37 -070044 RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope);
andrespcdf61722016-07-08 02:45:40 -070045};
46} // namespace
47
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000048//------------------------------------------------------------------
49// MessageQueueManager
50
deadbeef37f5ecf2017-02-27 14:06:41 -080051MessageQueueManager* MessageQueueManager::instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000052
53MessageQueueManager* MessageQueueManager::Instance() {
54 // Note: This is not thread safe, but it is first called before threads are
55 // spawned.
56 if (!instance_)
57 instance_ = new MessageQueueManager;
58 return instance_;
59}
60
61bool MessageQueueManager::IsInitialized() {
deadbeef37f5ecf2017-02-27 14:06:41 -080062 return instance_ != nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063}
64
jbauch5b361732017-07-06 23:51:37 -070065MessageQueueManager::MessageQueueManager() : processing_(0) {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000066
Yves Gerey665174f2018-06-19 15:03:05 +020067MessageQueueManager::~MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000068
Yves Gerey665174f2018-06-19 15:03:05 +020069void MessageQueueManager::Add(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000070 return Instance()->AddInternal(message_queue);
71}
Yves Gerey665174f2018-06-19 15:03:05 +020072void MessageQueueManager::AddInternal(MessageQueue* message_queue) {
jbauch5b361732017-07-06 23:51:37 -070073 CritScope cs(&crit_);
74 // Prevent changes while the list of message queues is processed.
75 RTC_DCHECK_EQ(processing_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000076 message_queues_.push_back(message_queue);
77}
78
Yves Gerey665174f2018-06-19 15:03:05 +020079void MessageQueueManager::Remove(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000080 // If there isn't a message queue manager instance, then there isn't a queue
81 // to remove.
Yves Gerey665174f2018-06-19 15:03:05 +020082 if (!instance_)
83 return;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000084 return Instance()->RemoveInternal(message_queue);
85}
Yves Gerey665174f2018-06-19 15:03:05 +020086void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000087 // If this is the last MessageQueue, destroy the manager as well so that
88 // we don't leak this object at program shutdown. As mentioned above, this is
89 // not thread-safe, but this should only happen at program termination (when
90 // the ThreadManager is destroyed, and threads are no longer active).
91 bool destroy = false;
92 {
jbauch5b361732017-07-06 23:51:37 -070093 CritScope cs(&crit_);
94 // Prevent changes while the list of message queues is processed.
95 RTC_DCHECK_EQ(processing_, 0);
Yves Gerey665174f2018-06-19 15:03:05 +020096 std::vector<MessageQueue*>::iterator iter;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000097 iter = std::find(message_queues_.begin(), message_queues_.end(),
98 message_queue);
99 if (iter != message_queues_.end()) {
100 message_queues_.erase(iter);
101 }
102 destroy = message_queues_.empty();
103 }
104 if (destroy) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800105 instance_ = nullptr;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000106 delete this;
107 }
108}
109
Yves Gerey665174f2018-06-19 15:03:05 +0200110void MessageQueueManager::Clear(MessageHandler* handler) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000111 // If there isn't a message queue manager instance, then there aren't any
112 // queues to remove this handler from.
Yves Gerey665174f2018-06-19 15:03:05 +0200113 if (!instance_)
114 return;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000115 return Instance()->ClearInternal(handler);
116}
Yves Gerey665174f2018-06-19 15:03:05 +0200117void MessageQueueManager::ClearInternal(MessageHandler* handler) {
jbauch5b361732017-07-06 23:51:37 -0700118 // Deleted objects may cause re-entrant calls to ClearInternal. This is
119 // allowed as the list of message queues does not change while queues are
120 // cleared.
121 MarkProcessingCritScope cs(&crit_, &processing_);
jbauch5b361732017-07-06 23:51:37 -0700122 for (MessageQueue* queue : message_queues_) {
123 queue->Clear(handler);
124 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000125}
126
Niels Möller8909a632018-09-06 08:42:44 +0200127void MessageQueueManager::ProcessAllMessageQueuesForTesting() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700128 if (!instance_) {
129 return;
130 }
deadbeeff5f03e82016-06-06 11:16:06 -0700131 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700132}
133
deadbeeff5f03e82016-06-06 11:16:06 -0700134void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700135 // This works by posting a delayed message at the current time and waiting
136 // for it to be dispatched on all queues, which will ensure that all messages
137 // that came before it were also dispatched.
138 volatile int queues_not_done = 0;
139
140 // This class is used so that whether the posted message is processed, or the
141 // message queue is simply cleared, queues_not_done gets decremented.
142 class ScopedIncrement : public MessageData {
143 public:
144 ScopedIncrement(volatile int* value) : value_(value) {
145 AtomicOps::Increment(value_);
146 }
147 ~ScopedIncrement() override { AtomicOps::Decrement(value_); }
148
149 private:
150 volatile int* value_;
151 };
152
deadbeeff5f03e82016-06-06 11:16:06 -0700153 {
jbauch5b361732017-07-06 23:51:37 -0700154 MarkProcessingCritScope cs(&crit_, &processing_);
deadbeeff5f03e82016-06-06 11:16:06 -0700155 for (MessageQueue* queue : message_queues_) {
Niels Möller8909a632018-09-06 08:42:44 +0200156 if (!queue->IsProcessingMessagesForTesting()) {
pthatcher1749bc32017-02-08 13:18:00 -0800157 // If the queue is not processing messages, it can
158 // be ignored. If we tried to post a message to it, it would be dropped
159 // or ignored.
Taylor Brandstetterfe7d0912016-09-15 17:47:42 -0700160 continue;
161 }
162 queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE,
163 new ScopedIncrement(&queues_not_done));
deadbeeff5f03e82016-06-06 11:16:06 -0700164 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700165 }
Niels Möller8909a632018-09-06 08:42:44 +0200166
167 rtc::Thread* current = rtc::Thread::Current();
168 // Note: One of the message queues may have been on this thread, which is
169 // why we can't synchronously wait for queues_not_done to go to 0; we need
170 // to process messages as well.
Ying Wangb2940902018-09-05 09:40:40 +0000171 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
Niels Möller8909a632018-09-06 08:42:44 +0200172 if (current) {
173 current->ProcessMessages(0);
174 }
deadbeeff5f03e82016-06-06 11:16:06 -0700175 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700176}
177
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000178//------------------------------------------------------------------
179// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800180MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
André Susano Pinto02a57972016-07-22 13:30:05 +0200181 : fPeekKeep_(false),
182 dmsgq_next_num_(0),
183 fInitialized_(false),
184 fDestroyed_(false),
185 stop_(0),
186 ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700187 RTC_DCHECK(ss);
188 // Currently, MessageQueue holds a socket server, and is the base class for
189 // Thread. It seems like it makes more sense for Thread to hold the socket
190 // server, and provide it to the MessageQueue, since the Thread controls
191 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
192 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000193 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800194 if (init_queue) {
195 DoInit();
196 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000197}
198
danilchapbebf54c2016-04-28 01:32:48 -0700199MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
200 : MessageQueue(ss.get(), init_queue) {
201 own_ss_ = std::move(ss);
202}
203
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000204MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800205 DoDestroy();
206}
207
208void MessageQueue::DoInit() {
209 if (fInitialized_) {
210 return;
211 }
212
213 fInitialized_ = true;
214 MessageQueueManager::Add(this);
215}
216
217void MessageQueue::DoDestroy() {
218 if (fDestroyed_) {
219 return;
220 }
221
222 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000223 // The signal is done from here to ensure
224 // that it always gets called when the queue
225 // is going away.
226 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000227 MessageQueueManager::Remove(this);
deadbeef37f5ecf2017-02-27 14:06:41 -0800228 Clear(nullptr);
jbauch9ccedc32016-02-25 01:14:56 -0800229
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000230 if (ss_) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800231 ss_->SetMessageQueue(nullptr);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000232 }
233}
234
jbauch9ccedc32016-02-25 01:14:56 -0800235SocketServer* MessageQueue::socketserver() {
jbauch9ccedc32016-02-25 01:14:56 -0800236 return ss_;
237}
238
jbauch9ccedc32016-02-25 01:14:56 -0800239void MessageQueue::WakeUpSocketServer() {
jbauch9ccedc32016-02-25 01:14:56 -0800240 ss_->WakeUp();
241}
242
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000243void MessageQueue::Quit() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200244 AtomicOps::ReleaseStore(&stop_, 1);
jbauch9ccedc32016-02-25 01:14:56 -0800245 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000246}
247
248bool MessageQueue::IsQuitting() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200249 return AtomicOps::AcquireLoad(&stop_) != 0;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000250}
251
Niels Möller8909a632018-09-06 08:42:44 +0200252bool MessageQueue::IsProcessingMessagesForTesting() {
pthatcher1749bc32017-02-08 13:18:00 -0800253 return !IsQuitting();
254}
255
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000256void MessageQueue::Restart() {
André Susano Pinto02a57972016-07-22 13:30:05 +0200257 AtomicOps::ReleaseStore(&stop_, 0);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000258}
259
Yves Gerey665174f2018-06-19 15:03:05 +0200260bool MessageQueue::Peek(Message* pmsg, int cmsWait) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000261 if (fPeekKeep_) {
262 *pmsg = msgPeek_;
263 return true;
264 }
265 if (!Get(pmsg, cmsWait))
266 return false;
267 msgPeek_ = *pmsg;
268 fPeekKeep_ = true;
269 return true;
270}
271
Yves Gerey665174f2018-06-19 15:03:05 +0200272bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000273 // Return and clear peek if present
274 // Always return the peek if it exists so there is Peek/Get symmetry
275
276 if (fPeekKeep_) {
277 *pmsg = msgPeek_;
278 fPeekKeep_ = false;
279 return true;
280 }
281
282 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
283
Honghai Zhang82d78622016-05-06 11:29:15 -0700284 int64_t cmsTotal = cmsWait;
285 int64_t cmsElapsed = 0;
286 int64_t msStart = TimeMillis();
287 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000288 while (true) {
289 // Check for sent messages
290 ReceiveSends();
291
292 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700293 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000294 bool first_pass = true;
295 while (true) {
296 // All queue operations need to be locked, but nothing else in this loop
297 // (specifically handling disposed message) can happen inside the crit.
298 // Otherwise, disposed MessageHandlers will cause deadlocks.
299 {
300 CritScope cs(&crit_);
301 // On the first pass, check for delayed messages that have been
302 // triggered and calculate the next trigger time.
303 if (first_pass) {
304 first_pass = false;
305 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700306 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000307 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
308 break;
309 }
310 msgq_.push_back(dmsgq_.top().msg_);
311 dmsgq_.pop();
312 }
313 }
314 // Pull a message off the message queue, if available.
315 if (msgq_.empty()) {
316 break;
317 } else {
318 *pmsg = msgq_.front();
319 msgq_.pop_front();
320 }
321 } // crit_ is released here.
322
323 // Log a warning for time-sensitive messages that we're late to deliver.
324 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700325 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000326 if (delay > 0) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100327 RTC_LOG_F(LS_WARNING)
328 << "id: " << pmsg->message_id
329 << " delay: " << (delay + kMaxMsgLatency) << "ms";
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000330 }
331 }
332 // If this was a dispose message, delete it and skip it.
333 if (MQID_DISPOSE == pmsg->message_id) {
deadbeef37f5ecf2017-02-27 14:06:41 -0800334 RTC_DCHECK(nullptr == pmsg->phandler);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000335 delete pmsg->pdata;
336 *pmsg = Message();
337 continue;
338 }
339 return true;
340 }
341
André Susano Pinto02a57972016-07-22 13:30:05 +0200342 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000343 break;
344
345 // Which is shorter, the delay wait or the asked wait?
346
Honghai Zhang82d78622016-05-06 11:29:15 -0700347 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000348 if (cmsWait == kForever) {
349 cmsNext = cmsDelayNext;
350 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700351 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000352 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
353 cmsNext = cmsDelayNext;
354 }
355
jbauch9ccedc32016-02-25 01:14:56 -0800356 {
357 // Wait and multiplex in the meantime
Honghai Zhang82d78622016-05-06 11:29:15 -0700358 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800359 return false;
360 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000361
362 // If the specified timeout expired, return
363
Honghai Zhang82d78622016-05-06 11:29:15 -0700364 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000365 cmsElapsed = TimeDiff(msCurrent, msStart);
366 if (cmsWait != kForever) {
367 if (cmsElapsed >= cmsWait)
368 return false;
369 }
370 }
371 return false;
372}
373
Yves Gerey665174f2018-06-19 15:03:05 +0200374void MessageQueue::ReceiveSends() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000375
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700376void MessageQueue::Post(const Location& posted_from,
377 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200378 uint32_t id,
379 MessageData* pdata,
380 bool time_sensitive) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200381 if (IsQuitting())
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000382 return;
383
384 // Keep thread safe
385 // Add the message to the end of the queue
386 // Signal for the multiplexer to return
387
jbauch9ccedc32016-02-25 01:14:56 -0800388 {
389 CritScope cs(&crit_);
390 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700391 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800392 msg.phandler = phandler;
393 msg.message_id = id;
394 msg.pdata = pdata;
395 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700396 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800397 }
398 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000399 }
jbauch9ccedc32016-02-25 01:14:56 -0800400 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000401}
402
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700403void MessageQueue::PostDelayed(const Location& posted_from,
404 int cmsDelay,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000405 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200406 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000407 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700408 return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id,
409 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000410}
411
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700412void MessageQueue::PostAt(const Location& posted_from,
413 uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000414 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200415 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000416 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700417 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700418 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700419 return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata);
Honghai Zhang82d78622016-05-06 11:29:15 -0700420}
421
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700422void MessageQueue::PostAt(const Location& posted_from,
423 int64_t tstamp,
Honghai Zhang82d78622016-05-06 11:29:15 -0700424 MessageHandler* phandler,
425 uint32_t id,
426 MessageData* pdata) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700427 return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id,
428 pdata);
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000429}
430
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700431void MessageQueue::DoDelayPost(const Location& posted_from,
432 int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700433 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200434 MessageHandler* phandler,
435 uint32_t id,
436 MessageData* pdata) {
André Susano Pinto02a57972016-07-22 13:30:05 +0200437 if (IsQuitting()) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000438 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700439 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000440
441 // Keep thread safe
442 // Add to the priority queue. Gets sorted soonest first.
443 // Signal for the multiplexer to return.
444
jbauch9ccedc32016-02-25 01:14:56 -0800445 {
446 CritScope cs(&crit_);
447 Message msg;
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700448 msg.posted_from = posted_from;
jbauch9ccedc32016-02-25 01:14:56 -0800449 msg.phandler = phandler;
450 msg.message_id = id;
451 msg.pdata = pdata;
452 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
453 dmsgq_.push(dmsg);
454 // If this message queue processes 1 message every millisecond for 50 days,
455 // we will wrap this number. Even then, only messages with identical times
456 // will be misordered, and then only briefly. This is probably ok.
nisse7ce109a2017-01-31 00:57:56 -0800457 ++dmsgq_next_num_;
458 RTC_DCHECK_NE(0, dmsgq_next_num_);
jbauch9ccedc32016-02-25 01:14:56 -0800459 }
460 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000461}
462
463int MessageQueue::GetDelay() {
464 CritScope cs(&crit_);
465
466 if (!msgq_.empty())
467 return 0;
468
469 if (!dmsgq_.empty()) {
470 int delay = TimeUntil(dmsgq_.top().msTrigger_);
471 if (delay < 0)
472 delay = 0;
473 return delay;
474 }
475
476 return kForever;
477}
478
Peter Boström0c4e06b2015-10-07 12:23:21 +0200479void MessageQueue::Clear(MessageHandler* phandler,
480 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000481 MessageList* removed) {
482 CritScope cs(&crit_);
483
484 // Remove messages with phandler
485
486 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
487 if (removed) {
488 removed->push_back(msgPeek_);
489 } else {
490 delete msgPeek_.pdata;
491 }
492 fPeekKeep_ = false;
493 }
494
495 // Remove from ordered message queue
496
497 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
498 if (it->Match(phandler, id)) {
499 if (removed) {
500 removed->push_back(*it);
501 } else {
502 delete it->pdata;
503 }
504 it = msgq_.erase(it);
505 } else {
506 ++it;
507 }
508 }
509
510 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000511
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000512 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
513 for (PriorityQueue::container_type::iterator it = new_end;
514 it != dmsgq_.container().end(); ++it) {
515 if (it->msg_.Match(phandler, id)) {
516 if (removed) {
517 removed->push_back(it->msg_);
518 } else {
519 delete it->msg_.pdata;
520 }
521 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000522 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000523 }
524 }
525 dmsgq_.container().erase(new_end, dmsgq_.container().end());
526 dmsgq_.reheap();
527}
528
Yves Gerey665174f2018-06-19 15:03:05 +0200529void MessageQueue::Dispatch(Message* pmsg) {
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700530 TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file_and_line",
531 pmsg->posted_from.file_and_line(), "src_func",
532 pmsg->posted_from.function_name());
533 int64_t start_time = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000534 pmsg->phandler->OnMessage(pmsg);
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700535 int64_t end_time = TimeMillis();
536 int64_t diff = TimeDiff(end_time, start_time);
537 if (diff >= kSlowDispatchLoggingThreshold) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100538 RTC_LOG(LS_INFO) << "Message took " << diff
539 << "ms to dispatch. Posted from: "
540 << pmsg->posted_from.ToString();
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700541 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000542}
543
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000544} // namespace rtc