blob: da50e2304f7a686af4245df733566528d279d185 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
danilchapbebf54c2016-04-28 01:32:48 -070012#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000013#include "webrtc/base/common.h"
14#include "webrtc/base/logging.h"
15#include "webrtc/base/messagequeue.h"
pbos79e28422016-04-29 08:48:05 -070016#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017
Taylor Brandstetterb3c68102016-05-27 14:15:43 -070018namespace {
19
20enum { MSG_WAKE_MESSAGE_QUEUE = 1 };
21}
22
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000023namespace rtc {
24
Honghai Zhang82d78622016-05-06 11:29:15 -070025const int kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000026
27//------------------------------------------------------------------
28// MessageQueueManager
29
30MessageQueueManager* MessageQueueManager::instance_ = NULL;
31
32MessageQueueManager* MessageQueueManager::Instance() {
33 // Note: This is not thread safe, but it is first called before threads are
34 // spawned.
35 if (!instance_)
36 instance_ = new MessageQueueManager;
37 return instance_;
38}
39
40bool MessageQueueManager::IsInitialized() {
41 return instance_ != NULL;
42}
43
44MessageQueueManager::MessageQueueManager() {
45}
46
47MessageQueueManager::~MessageQueueManager() {
48}
49
50void MessageQueueManager::Add(MessageQueue *message_queue) {
51 return Instance()->AddInternal(message_queue);
52}
53void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
54 // MessageQueueManager methods should be non-reentrant, so we
55 // ASSERT that is the case. If any of these ASSERT, please
56 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020057#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000058 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000059#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000060 CritScope cs(&crit_);
61 message_queues_.push_back(message_queue);
62}
63
64void MessageQueueManager::Remove(MessageQueue *message_queue) {
65 // If there isn't a message queue manager instance, then there isn't a queue
66 // to remove.
67 if (!instance_) return;
68 return Instance()->RemoveInternal(message_queue);
69}
70void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020071#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000072 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000073#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000074 // If this is the last MessageQueue, destroy the manager as well so that
75 // we don't leak this object at program shutdown. As mentioned above, this is
76 // not thread-safe, but this should only happen at program termination (when
77 // the ThreadManager is destroyed, and threads are no longer active).
78 bool destroy = false;
79 {
80 CritScope cs(&crit_);
81 std::vector<MessageQueue *>::iterator iter;
82 iter = std::find(message_queues_.begin(), message_queues_.end(),
83 message_queue);
84 if (iter != message_queues_.end()) {
85 message_queues_.erase(iter);
86 }
87 destroy = message_queues_.empty();
88 }
89 if (destroy) {
90 instance_ = NULL;
91 delete this;
92 }
93}
94
95void MessageQueueManager::Clear(MessageHandler *handler) {
96 // If there isn't a message queue manager instance, then there aren't any
97 // queues to remove this handler from.
98 if (!instance_) return;
99 return Instance()->ClearInternal(handler);
100}
101void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +0200102#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000103 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000104#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000105 CritScope cs(&crit_);
106 std::vector<MessageQueue *>::iterator iter;
107 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
108 (*iter)->Clear(handler);
109}
110
Taylor Brandstetterb3c68102016-05-27 14:15:43 -0700111void MessageQueueManager::WakeAllMessageQueues() {
112 if (!instance_) {
113 return;
114 }
115 return Instance()->WakeAllMessageQueuesInternal();
116}
117
118void MessageQueueManager::WakeAllMessageQueuesInternal() {
119#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
120 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
121#endif
122 CritScope cs(&crit_);
123 for (MessageQueue* queue : message_queues_) {
124 // Posting an arbitrary message will force the message queue to wake up.
125 queue->Post(this, MSG_WAKE_MESSAGE_QUEUE);
126 }
127}
128
129void MessageQueueManager::OnMessage(Message* pmsg) {
130 RTC_DCHECK(pmsg->message_id == MSG_WAKE_MESSAGE_QUEUE);
131}
132
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000133//------------------------------------------------------------------
134// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800135MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800136 : fStop_(false), fPeekKeep_(false),
137 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700138 RTC_DCHECK(ss);
139 // Currently, MessageQueue holds a socket server, and is the base class for
140 // Thread. It seems like it makes more sense for Thread to hold the socket
141 // server, and provide it to the MessageQueue, since the Thread controls
142 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
143 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000144 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800145 if (init_queue) {
146 DoInit();
147 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000148}
149
danilchapbebf54c2016-04-28 01:32:48 -0700150MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
151 : MessageQueue(ss.get(), init_queue) {
152 own_ss_ = std::move(ss);
153}
154
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000155MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800156 DoDestroy();
157}
158
159void MessageQueue::DoInit() {
160 if (fInitialized_) {
161 return;
162 }
163
164 fInitialized_ = true;
165 MessageQueueManager::Add(this);
166}
167
168void MessageQueue::DoDestroy() {
169 if (fDestroyed_) {
170 return;
171 }
172
173 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000174 // The signal is done from here to ensure
175 // that it always gets called when the queue
176 // is going away.
177 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000178 MessageQueueManager::Remove(this);
179 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800180
181 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000182 if (ss_) {
183 ss_->SetMessageQueue(NULL);
184 }
185}
186
jbauch9ccedc32016-02-25 01:14:56 -0800187SocketServer* MessageQueue::socketserver() {
188 SharedScope ss(&ss_lock_);
189 return ss_;
190}
191
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000192void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800193 // Need to lock exclusively here to prevent simultaneous modifications from
194 // other threads. Can't be a shared lock to prevent races with other reading
195 // threads.
196 // Other places that only read "ss_" can use a shared lock as simultaneous
197 // read access is allowed.
198 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700199 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000200 ss_->SetMessageQueue(this);
201}
202
jbauch9ccedc32016-02-25 01:14:56 -0800203void MessageQueue::WakeUpSocketServer() {
204 SharedScope ss(&ss_lock_);
205 ss_->WakeUp();
206}
207
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000208void MessageQueue::Quit() {
209 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800210 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000211}
212
213bool MessageQueue::IsQuitting() {
214 return fStop_;
215}
216
217void MessageQueue::Restart() {
218 fStop_ = false;
219}
220
221bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
222 if (fPeekKeep_) {
223 *pmsg = msgPeek_;
224 return true;
225 }
226 if (!Get(pmsg, cmsWait))
227 return false;
228 msgPeek_ = *pmsg;
229 fPeekKeep_ = true;
230 return true;
231}
232
233bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
234 // Return and clear peek if present
235 // Always return the peek if it exists so there is Peek/Get symmetry
236
237 if (fPeekKeep_) {
238 *pmsg = msgPeek_;
239 fPeekKeep_ = false;
240 return true;
241 }
242
243 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
244
Honghai Zhang82d78622016-05-06 11:29:15 -0700245 int64_t cmsTotal = cmsWait;
246 int64_t cmsElapsed = 0;
247 int64_t msStart = TimeMillis();
248 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000249 while (true) {
250 // Check for sent messages
251 ReceiveSends();
252
253 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700254 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000255 bool first_pass = true;
256 while (true) {
257 // All queue operations need to be locked, but nothing else in this loop
258 // (specifically handling disposed message) can happen inside the crit.
259 // Otherwise, disposed MessageHandlers will cause deadlocks.
260 {
261 CritScope cs(&crit_);
262 // On the first pass, check for delayed messages that have been
263 // triggered and calculate the next trigger time.
264 if (first_pass) {
265 first_pass = false;
266 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700267 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000268 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
269 break;
270 }
271 msgq_.push_back(dmsgq_.top().msg_);
272 dmsgq_.pop();
273 }
274 }
275 // Pull a message off the message queue, if available.
276 if (msgq_.empty()) {
277 break;
278 } else {
279 *pmsg = msgq_.front();
280 msgq_.pop_front();
281 }
282 } // crit_ is released here.
283
284 // Log a warning for time-sensitive messages that we're late to deliver.
285 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700286 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000287 if (delay > 0) {
288 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
289 << (delay + kMaxMsgLatency) << "ms";
290 }
291 }
292 // If this was a dispose message, delete it and skip it.
293 if (MQID_DISPOSE == pmsg->message_id) {
294 ASSERT(NULL == pmsg->phandler);
295 delete pmsg->pdata;
296 *pmsg = Message();
297 continue;
298 }
299 return true;
300 }
301
302 if (fStop_)
303 break;
304
305 // Which is shorter, the delay wait or the asked wait?
306
Honghai Zhang82d78622016-05-06 11:29:15 -0700307 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000308 if (cmsWait == kForever) {
309 cmsNext = cmsDelayNext;
310 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700311 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000312 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
313 cmsNext = cmsDelayNext;
314 }
315
jbauch9ccedc32016-02-25 01:14:56 -0800316 {
317 // Wait and multiplex in the meantime
318 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700319 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800320 return false;
321 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000322
323 // If the specified timeout expired, return
324
Honghai Zhang82d78622016-05-06 11:29:15 -0700325 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000326 cmsElapsed = TimeDiff(msCurrent, msStart);
327 if (cmsWait != kForever) {
328 if (cmsElapsed >= cmsWait)
329 return false;
330 }
331 }
332 return false;
333}
334
335void MessageQueue::ReceiveSends() {
336}
337
Peter Boström0c4e06b2015-10-07 12:23:21 +0200338void MessageQueue::Post(MessageHandler* phandler,
339 uint32_t id,
340 MessageData* pdata,
341 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000342 if (fStop_)
343 return;
344
345 // Keep thread safe
346 // Add the message to the end of the queue
347 // Signal for the multiplexer to return
348
jbauch9ccedc32016-02-25 01:14:56 -0800349 {
350 CritScope cs(&crit_);
351 Message msg;
352 msg.phandler = phandler;
353 msg.message_id = id;
354 msg.pdata = pdata;
355 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700356 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800357 }
358 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000359 }
jbauch9ccedc32016-02-25 01:14:56 -0800360 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000361}
362
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000363void MessageQueue::PostDelayed(int cmsDelay,
364 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200365 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000366 MessageData* pdata) {
367 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
368}
369
Peter Boström0c4e06b2015-10-07 12:23:21 +0200370void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000371 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200372 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000373 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700374 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700375 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Honghai Zhang82d78622016-05-06 11:29:15 -0700376 return DoDelayPost(delay, tstamp, phandler, id, pdata);
377}
378
379void MessageQueue::PostAt(int64_t tstamp,
380 MessageHandler* phandler,
381 uint32_t id,
382 MessageData* pdata) {
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000383 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
384}
385
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700386void MessageQueue::DoDelayPost(int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700387 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200388 MessageHandler* phandler,
389 uint32_t id,
390 MessageData* pdata) {
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700391 if (fStop_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000392 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700393 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000394
395 // Keep thread safe
396 // Add to the priority queue. Gets sorted soonest first.
397 // Signal for the multiplexer to return.
398
jbauch9ccedc32016-02-25 01:14:56 -0800399 {
400 CritScope cs(&crit_);
401 Message msg;
402 msg.phandler = phandler;
403 msg.message_id = id;
404 msg.pdata = pdata;
405 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
406 dmsgq_.push(dmsg);
407 // If this message queue processes 1 message every millisecond for 50 days,
408 // we will wrap this number. Even then, only messages with identical times
409 // will be misordered, and then only briefly. This is probably ok.
410 VERIFY(0 != ++dmsgq_next_num_);
411 }
412 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000413}
414
415int MessageQueue::GetDelay() {
416 CritScope cs(&crit_);
417
418 if (!msgq_.empty())
419 return 0;
420
421 if (!dmsgq_.empty()) {
422 int delay = TimeUntil(dmsgq_.top().msTrigger_);
423 if (delay < 0)
424 delay = 0;
425 return delay;
426 }
427
428 return kForever;
429}
430
Peter Boström0c4e06b2015-10-07 12:23:21 +0200431void MessageQueue::Clear(MessageHandler* phandler,
432 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000433 MessageList* removed) {
434 CritScope cs(&crit_);
435
436 // Remove messages with phandler
437
438 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
439 if (removed) {
440 removed->push_back(msgPeek_);
441 } else {
442 delete msgPeek_.pdata;
443 }
444 fPeekKeep_ = false;
445 }
446
447 // Remove from ordered message queue
448
449 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
450 if (it->Match(phandler, id)) {
451 if (removed) {
452 removed->push_back(*it);
453 } else {
454 delete it->pdata;
455 }
456 it = msgq_.erase(it);
457 } else {
458 ++it;
459 }
460 }
461
462 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000463
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000464 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
465 for (PriorityQueue::container_type::iterator it = new_end;
466 it != dmsgq_.container().end(); ++it) {
467 if (it->msg_.Match(phandler, id)) {
468 if (removed) {
469 removed->push_back(it->msg_);
470 } else {
471 delete it->msg_.pdata;
472 }
473 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000474 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000475 }
476 }
477 dmsgq_.container().erase(new_end, dmsgq_.container().end());
478 dmsgq_.reheap();
479}
480
481void MessageQueue::Dispatch(Message *pmsg) {
pbos79e28422016-04-29 08:48:05 -0700482 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000483 pmsg->phandler->OnMessage(pmsg);
484}
485
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000486} // namespace rtc