blob: 4c2331bfe603c17cfd5c5376e85ef591bbcacb4d [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
deadbeeff5f03e82016-06-06 11:16:06 -070012#include "webrtc/base/atomicops.h"
danilchapbebf54c2016-04-28 01:32:48 -070013#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000014#include "webrtc/base/common.h"
15#include "webrtc/base/logging.h"
16#include "webrtc/base/messagequeue.h"
deadbeeff5f03e82016-06-06 11:16:06 -070017#include "webrtc/base/thread.h"
pbos79e28422016-04-29 08:48:05 -070018#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000019
20namespace rtc {
21
Honghai Zhang82d78622016-05-06 11:29:15 -070022const int kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023
24//------------------------------------------------------------------
25// MessageQueueManager
26
27MessageQueueManager* MessageQueueManager::instance_ = NULL;
28
29MessageQueueManager* MessageQueueManager::Instance() {
30 // Note: This is not thread safe, but it is first called before threads are
31 // spawned.
32 if (!instance_)
33 instance_ = new MessageQueueManager;
34 return instance_;
35}
36
37bool MessageQueueManager::IsInitialized() {
38 return instance_ != NULL;
39}
40
deadbeeff5f03e82016-06-06 11:16:06 -070041MessageQueueManager::MessageQueueManager() {}
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000042
43MessageQueueManager::~MessageQueueManager() {
44}
45
46void MessageQueueManager::Add(MessageQueue *message_queue) {
47 return Instance()->AddInternal(message_queue);
48}
49void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
50 // MessageQueueManager methods should be non-reentrant, so we
51 // ASSERT that is the case. If any of these ASSERT, please
52 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020053#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000054 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000055#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000056 CritScope cs(&crit_);
57 message_queues_.push_back(message_queue);
58}
59
60void MessageQueueManager::Remove(MessageQueue *message_queue) {
61 // If there isn't a message queue manager instance, then there isn't a queue
62 // to remove.
63 if (!instance_) return;
64 return Instance()->RemoveInternal(message_queue);
65}
66void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020067#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000068 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000069#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000070 // If this is the last MessageQueue, destroy the manager as well so that
71 // we don't leak this object at program shutdown. As mentioned above, this is
72 // not thread-safe, but this should only happen at program termination (when
73 // the ThreadManager is destroyed, and threads are no longer active).
74 bool destroy = false;
75 {
76 CritScope cs(&crit_);
77 std::vector<MessageQueue *>::iterator iter;
78 iter = std::find(message_queues_.begin(), message_queues_.end(),
79 message_queue);
80 if (iter != message_queues_.end()) {
81 message_queues_.erase(iter);
82 }
83 destroy = message_queues_.empty();
84 }
85 if (destroy) {
86 instance_ = NULL;
87 delete this;
88 }
89}
90
91void MessageQueueManager::Clear(MessageHandler *handler) {
92 // If there isn't a message queue manager instance, then there aren't any
93 // queues to remove this handler from.
94 if (!instance_) return;
95 return Instance()->ClearInternal(handler);
96}
97void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +020098#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000099 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000100#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000101 CritScope cs(&crit_);
102 std::vector<MessageQueue *>::iterator iter;
103 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
104 (*iter)->Clear(handler);
105}
106
deadbeeff5f03e82016-06-06 11:16:06 -0700107void MessageQueueManager::ProcessAllMessageQueues() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700108 if (!instance_) {
109 return;
110 }
deadbeeff5f03e82016-06-06 11:16:06 -0700111 return Instance()->ProcessAllMessageQueuesInternal();
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700112}
113
deadbeeff5f03e82016-06-06 11:16:06 -0700114void MessageQueueManager::ProcessAllMessageQueuesInternal() {
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700115#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
116 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
117#endif
deadbeeff5f03e82016-06-06 11:16:06 -0700118 // Post a delayed message at the current time and wait for it to be dispatched
119 // on all queues, which will ensure that all messages that came before it were
120 // also dispatched.
121 volatile int queues_not_done;
122 auto functor = [&queues_not_done] { AtomicOps::Decrement(&queues_not_done); };
123 FunctorMessageHandler<void, decltype(functor)> handler(functor);
124 {
125 CritScope cs(&crit_);
126 queues_not_done = static_cast<int>(message_queues_.size());
127 for (MessageQueue* queue : message_queues_) {
128 queue->PostDelayed(0, &handler);
129 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700130 }
deadbeeff5f03e82016-06-06 11:16:06 -0700131 // Note: One of the message queues may have been on this thread, which is why
132 // we can't synchronously wait for queues_not_done to go to 0; we need to
133 // process messages as well.
134 while (AtomicOps::AcquireLoad(&queues_not_done) > 0) {
135 rtc::Thread::Current()->ProcessMessages(0);
136 }
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700137}
138
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000139//------------------------------------------------------------------
140// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800141MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800142 : fStop_(false), fPeekKeep_(false),
143 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700144 RTC_DCHECK(ss);
145 // Currently, MessageQueue holds a socket server, and is the base class for
146 // Thread. It seems like it makes more sense for Thread to hold the socket
147 // server, and provide it to the MessageQueue, since the Thread controls
148 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
149 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000150 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800151 if (init_queue) {
152 DoInit();
153 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000154}
155
danilchapbebf54c2016-04-28 01:32:48 -0700156MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
157 : MessageQueue(ss.get(), init_queue) {
158 own_ss_ = std::move(ss);
159}
160
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000161MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800162 DoDestroy();
163}
164
165void MessageQueue::DoInit() {
166 if (fInitialized_) {
167 return;
168 }
169
170 fInitialized_ = true;
171 MessageQueueManager::Add(this);
172}
173
174void MessageQueue::DoDestroy() {
175 if (fDestroyed_) {
176 return;
177 }
178
179 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000180 // The signal is done from here to ensure
181 // that it always gets called when the queue
182 // is going away.
183 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000184 MessageQueueManager::Remove(this);
185 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800186
187 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000188 if (ss_) {
189 ss_->SetMessageQueue(NULL);
190 }
191}
192
jbauch9ccedc32016-02-25 01:14:56 -0800193SocketServer* MessageQueue::socketserver() {
194 SharedScope ss(&ss_lock_);
195 return ss_;
196}
197
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000198void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800199 // Need to lock exclusively here to prevent simultaneous modifications from
200 // other threads. Can't be a shared lock to prevent races with other reading
201 // threads.
202 // Other places that only read "ss_" can use a shared lock as simultaneous
203 // read access is allowed.
204 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700205 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000206 ss_->SetMessageQueue(this);
207}
208
jbauch9ccedc32016-02-25 01:14:56 -0800209void MessageQueue::WakeUpSocketServer() {
210 SharedScope ss(&ss_lock_);
211 ss_->WakeUp();
212}
213
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000214void MessageQueue::Quit() {
215 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800216 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000217}
218
219bool MessageQueue::IsQuitting() {
220 return fStop_;
221}
222
223void MessageQueue::Restart() {
224 fStop_ = false;
225}
226
227bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
228 if (fPeekKeep_) {
229 *pmsg = msgPeek_;
230 return true;
231 }
232 if (!Get(pmsg, cmsWait))
233 return false;
234 msgPeek_ = *pmsg;
235 fPeekKeep_ = true;
236 return true;
237}
238
239bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
240 // Return and clear peek if present
241 // Always return the peek if it exists so there is Peek/Get symmetry
242
243 if (fPeekKeep_) {
244 *pmsg = msgPeek_;
245 fPeekKeep_ = false;
246 return true;
247 }
248
249 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
250
Honghai Zhang82d78622016-05-06 11:29:15 -0700251 int64_t cmsTotal = cmsWait;
252 int64_t cmsElapsed = 0;
253 int64_t msStart = TimeMillis();
254 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000255 while (true) {
256 // Check for sent messages
257 ReceiveSends();
258
259 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700260 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000261 bool first_pass = true;
262 while (true) {
263 // All queue operations need to be locked, but nothing else in this loop
264 // (specifically handling disposed message) can happen inside the crit.
265 // Otherwise, disposed MessageHandlers will cause deadlocks.
266 {
267 CritScope cs(&crit_);
268 // On the first pass, check for delayed messages that have been
269 // triggered and calculate the next trigger time.
270 if (first_pass) {
271 first_pass = false;
272 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700273 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000274 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
275 break;
276 }
277 msgq_.push_back(dmsgq_.top().msg_);
278 dmsgq_.pop();
279 }
280 }
281 // Pull a message off the message queue, if available.
282 if (msgq_.empty()) {
283 break;
284 } else {
285 *pmsg = msgq_.front();
286 msgq_.pop_front();
287 }
288 } // crit_ is released here.
289
290 // Log a warning for time-sensitive messages that we're late to deliver.
291 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700292 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000293 if (delay > 0) {
294 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
295 << (delay + kMaxMsgLatency) << "ms";
296 }
297 }
298 // If this was a dispose message, delete it and skip it.
299 if (MQID_DISPOSE == pmsg->message_id) {
300 ASSERT(NULL == pmsg->phandler);
301 delete pmsg->pdata;
302 *pmsg = Message();
303 continue;
304 }
305 return true;
306 }
307
308 if (fStop_)
309 break;
310
311 // Which is shorter, the delay wait or the asked wait?
312
Honghai Zhang82d78622016-05-06 11:29:15 -0700313 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000314 if (cmsWait == kForever) {
315 cmsNext = cmsDelayNext;
316 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700317 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000318 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
319 cmsNext = cmsDelayNext;
320 }
321
jbauch9ccedc32016-02-25 01:14:56 -0800322 {
323 // Wait and multiplex in the meantime
324 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700325 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800326 return false;
327 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000328
329 // If the specified timeout expired, return
330
Honghai Zhang82d78622016-05-06 11:29:15 -0700331 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000332 cmsElapsed = TimeDiff(msCurrent, msStart);
333 if (cmsWait != kForever) {
334 if (cmsElapsed >= cmsWait)
335 return false;
336 }
337 }
338 return false;
339}
340
341void MessageQueue::ReceiveSends() {
342}
343
Peter Boström0c4e06b2015-10-07 12:23:21 +0200344void MessageQueue::Post(MessageHandler* phandler,
345 uint32_t id,
346 MessageData* pdata,
347 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000348 if (fStop_)
349 return;
350
351 // Keep thread safe
352 // Add the message to the end of the queue
353 // Signal for the multiplexer to return
354
jbauch9ccedc32016-02-25 01:14:56 -0800355 {
356 CritScope cs(&crit_);
357 Message msg;
358 msg.phandler = phandler;
359 msg.message_id = id;
360 msg.pdata = pdata;
361 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700362 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800363 }
364 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000365 }
jbauch9ccedc32016-02-25 01:14:56 -0800366 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000367}
368
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000369void MessageQueue::PostDelayed(int cmsDelay,
370 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200371 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000372 MessageData* pdata) {
373 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
374}
375
Peter Boström0c4e06b2015-10-07 12:23:21 +0200376void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000377 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200378 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000379 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700380 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700381 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Honghai Zhang82d78622016-05-06 11:29:15 -0700382 return DoDelayPost(delay, tstamp, phandler, id, pdata);
383}
384
385void MessageQueue::PostAt(int64_t tstamp,
386 MessageHandler* phandler,
387 uint32_t id,
388 MessageData* pdata) {
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000389 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
390}
391
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700392void MessageQueue::DoDelayPost(int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700393 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200394 MessageHandler* phandler,
395 uint32_t id,
396 MessageData* pdata) {
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700397 if (fStop_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000398 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700399 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000400
401 // Keep thread safe
402 // Add to the priority queue. Gets sorted soonest first.
403 // Signal for the multiplexer to return.
404
jbauch9ccedc32016-02-25 01:14:56 -0800405 {
406 CritScope cs(&crit_);
407 Message msg;
408 msg.phandler = phandler;
409 msg.message_id = id;
410 msg.pdata = pdata;
411 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
412 dmsgq_.push(dmsg);
413 // If this message queue processes 1 message every millisecond for 50 days,
414 // we will wrap this number. Even then, only messages with identical times
415 // will be misordered, and then only briefly. This is probably ok.
416 VERIFY(0 != ++dmsgq_next_num_);
417 }
418 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000419}
420
421int MessageQueue::GetDelay() {
422 CritScope cs(&crit_);
423
424 if (!msgq_.empty())
425 return 0;
426
427 if (!dmsgq_.empty()) {
428 int delay = TimeUntil(dmsgq_.top().msTrigger_);
429 if (delay < 0)
430 delay = 0;
431 return delay;
432 }
433
434 return kForever;
435}
436
Peter Boström0c4e06b2015-10-07 12:23:21 +0200437void MessageQueue::Clear(MessageHandler* phandler,
438 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000439 MessageList* removed) {
440 CritScope cs(&crit_);
441
442 // Remove messages with phandler
443
444 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
445 if (removed) {
446 removed->push_back(msgPeek_);
447 } else {
448 delete msgPeek_.pdata;
449 }
450 fPeekKeep_ = false;
451 }
452
453 // Remove from ordered message queue
454
455 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
456 if (it->Match(phandler, id)) {
457 if (removed) {
458 removed->push_back(*it);
459 } else {
460 delete it->pdata;
461 }
462 it = msgq_.erase(it);
463 } else {
464 ++it;
465 }
466 }
467
468 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000469
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000470 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
471 for (PriorityQueue::container_type::iterator it = new_end;
472 it != dmsgq_.container().end(); ++it) {
473 if (it->msg_.Match(phandler, id)) {
474 if (removed) {
475 removed->push_back(it->msg_);
476 } else {
477 delete it->msg_.pdata;
478 }
479 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000480 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000481 }
482 }
483 dmsgq_.container().erase(new_end, dmsgq_.container().end());
484 dmsgq_.reheap();
485}
486
487void MessageQueue::Dispatch(Message *pmsg) {
pbos79e28422016-04-29 08:48:05 -0700488 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000489 pmsg->phandler->OnMessage(pmsg);
490}
491
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000492} // namespace rtc