blob: 84fdaf1a95ae3858dfaa1ab2873d0b552685da02 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
danilchapbebf54c2016-04-28 01:32:48 -070012#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000013#include "webrtc/base/common.h"
14#include "webrtc/base/logging.h"
15#include "webrtc/base/messagequeue.h"
pbos79e28422016-04-29 08:48:05 -070016#include "webrtc/base/trace_event.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017
18namespace rtc {
19
Honghai Zhang82d78622016-05-06 11:29:15 -070020const int kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000021
22//------------------------------------------------------------------
23// MessageQueueManager
24
25MessageQueueManager* MessageQueueManager::instance_ = NULL;
26
27MessageQueueManager* MessageQueueManager::Instance() {
28 // Note: This is not thread safe, but it is first called before threads are
29 // spawned.
30 if (!instance_)
31 instance_ = new MessageQueueManager;
32 return instance_;
33}
34
35bool MessageQueueManager::IsInitialized() {
36 return instance_ != NULL;
37}
38
39MessageQueueManager::MessageQueueManager() {
40}
41
42MessageQueueManager::~MessageQueueManager() {
43}
44
45void MessageQueueManager::Add(MessageQueue *message_queue) {
46 return Instance()->AddInternal(message_queue);
47}
48void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
49 // MessageQueueManager methods should be non-reentrant, so we
50 // ASSERT that is the case. If any of these ASSERT, please
51 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020052#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000053 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000054#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000055 CritScope cs(&crit_);
56 message_queues_.push_back(message_queue);
57}
58
59void MessageQueueManager::Remove(MessageQueue *message_queue) {
60 // If there isn't a message queue manager instance, then there isn't a queue
61 // to remove.
62 if (!instance_) return;
63 return Instance()->RemoveInternal(message_queue);
64}
65void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020066#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000067 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000068#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000069 // If this is the last MessageQueue, destroy the manager as well so that
70 // we don't leak this object at program shutdown. As mentioned above, this is
71 // not thread-safe, but this should only happen at program termination (when
72 // the ThreadManager is destroyed, and threads are no longer active).
73 bool destroy = false;
74 {
75 CritScope cs(&crit_);
76 std::vector<MessageQueue *>::iterator iter;
77 iter = std::find(message_queues_.begin(), message_queues_.end(),
78 message_queue);
79 if (iter != message_queues_.end()) {
80 message_queues_.erase(iter);
81 }
82 destroy = message_queues_.empty();
83 }
84 if (destroy) {
85 instance_ = NULL;
86 delete this;
87 }
88}
89
90void MessageQueueManager::Clear(MessageHandler *handler) {
91 // If there isn't a message queue manager instance, then there aren't any
92 // queues to remove this handler from.
93 if (!instance_) return;
94 return Instance()->ClearInternal(handler);
95}
96void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +020097#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000098 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000099#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000100 CritScope cs(&crit_);
101 std::vector<MessageQueue *>::iterator iter;
102 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
103 (*iter)->Clear(handler);
104}
105
106//------------------------------------------------------------------
107// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800108MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800109 : fStop_(false), fPeekKeep_(false),
110 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700111 RTC_DCHECK(ss);
112 // Currently, MessageQueue holds a socket server, and is the base class for
113 // Thread. It seems like it makes more sense for Thread to hold the socket
114 // server, and provide it to the MessageQueue, since the Thread controls
115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
116 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000117 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800118 if (init_queue) {
119 DoInit();
120 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000121}
122
danilchapbebf54c2016-04-28 01:32:48 -0700123MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
124 : MessageQueue(ss.get(), init_queue) {
125 own_ss_ = std::move(ss);
126}
127
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000128MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800129 DoDestroy();
130}
131
132void MessageQueue::DoInit() {
133 if (fInitialized_) {
134 return;
135 }
136
137 fInitialized_ = true;
138 MessageQueueManager::Add(this);
139}
140
141void MessageQueue::DoDestroy() {
142 if (fDestroyed_) {
143 return;
144 }
145
146 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000147 // The signal is done from here to ensure
148 // that it always gets called when the queue
149 // is going away.
150 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000151 MessageQueueManager::Remove(this);
152 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800153
154 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000155 if (ss_) {
156 ss_->SetMessageQueue(NULL);
157 }
158}
159
jbauch9ccedc32016-02-25 01:14:56 -0800160SocketServer* MessageQueue::socketserver() {
161 SharedScope ss(&ss_lock_);
162 return ss_;
163}
164
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000165void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800166 // Need to lock exclusively here to prevent simultaneous modifications from
167 // other threads. Can't be a shared lock to prevent races with other reading
168 // threads.
169 // Other places that only read "ss_" can use a shared lock as simultaneous
170 // read access is allowed.
171 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700172 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000173 ss_->SetMessageQueue(this);
174}
175
jbauch9ccedc32016-02-25 01:14:56 -0800176void MessageQueue::WakeUpSocketServer() {
177 SharedScope ss(&ss_lock_);
178 ss_->WakeUp();
179}
180
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000181void MessageQueue::Quit() {
182 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800183 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000184}
185
186bool MessageQueue::IsQuitting() {
187 return fStop_;
188}
189
190void MessageQueue::Restart() {
191 fStop_ = false;
192}
193
194bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
195 if (fPeekKeep_) {
196 *pmsg = msgPeek_;
197 return true;
198 }
199 if (!Get(pmsg, cmsWait))
200 return false;
201 msgPeek_ = *pmsg;
202 fPeekKeep_ = true;
203 return true;
204}
205
206bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
207 // Return and clear peek if present
208 // Always return the peek if it exists so there is Peek/Get symmetry
209
210 if (fPeekKeep_) {
211 *pmsg = msgPeek_;
212 fPeekKeep_ = false;
213 return true;
214 }
215
216 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
217
Honghai Zhang82d78622016-05-06 11:29:15 -0700218 int64_t cmsTotal = cmsWait;
219 int64_t cmsElapsed = 0;
220 int64_t msStart = TimeMillis();
221 int64_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000222 while (true) {
223 // Check for sent messages
224 ReceiveSends();
225
226 // Check for posted events
Honghai Zhang82d78622016-05-06 11:29:15 -0700227 int64_t cmsDelayNext = kForever;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000228 bool first_pass = true;
229 while (true) {
230 // All queue operations need to be locked, but nothing else in this loop
231 // (specifically handling disposed message) can happen inside the crit.
232 // Otherwise, disposed MessageHandlers will cause deadlocks.
233 {
234 CritScope cs(&crit_);
235 // On the first pass, check for delayed messages that have been
236 // triggered and calculate the next trigger time.
237 if (first_pass) {
238 first_pass = false;
239 while (!dmsgq_.empty()) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700240 if (msCurrent < dmsgq_.top().msTrigger_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000241 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
242 break;
243 }
244 msgq_.push_back(dmsgq_.top().msg_);
245 dmsgq_.pop();
246 }
247 }
248 // Pull a message off the message queue, if available.
249 if (msgq_.empty()) {
250 break;
251 } else {
252 *pmsg = msgq_.front();
253 msgq_.pop_front();
254 }
255 } // crit_ is released here.
256
257 // Log a warning for time-sensitive messages that we're late to deliver.
258 if (pmsg->ts_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700259 int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000260 if (delay > 0) {
261 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
262 << (delay + kMaxMsgLatency) << "ms";
263 }
264 }
265 // If this was a dispose message, delete it and skip it.
266 if (MQID_DISPOSE == pmsg->message_id) {
267 ASSERT(NULL == pmsg->phandler);
268 delete pmsg->pdata;
269 *pmsg = Message();
270 continue;
271 }
272 return true;
273 }
274
275 if (fStop_)
276 break;
277
278 // Which is shorter, the delay wait or the asked wait?
279
Honghai Zhang82d78622016-05-06 11:29:15 -0700280 int64_t cmsNext;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000281 if (cmsWait == kForever) {
282 cmsNext = cmsDelayNext;
283 } else {
Honghai Zhang82d78622016-05-06 11:29:15 -0700284 cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000285 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
286 cmsNext = cmsDelayNext;
287 }
288
jbauch9ccedc32016-02-25 01:14:56 -0800289 {
290 // Wait and multiplex in the meantime
291 SharedScope ss(&ss_lock_);
Honghai Zhang82d78622016-05-06 11:29:15 -0700292 if (!ss_->Wait(static_cast<int>(cmsNext), process_io))
jbauch9ccedc32016-02-25 01:14:56 -0800293 return false;
294 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000295
296 // If the specified timeout expired, return
297
Honghai Zhang82d78622016-05-06 11:29:15 -0700298 msCurrent = TimeMillis();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000299 cmsElapsed = TimeDiff(msCurrent, msStart);
300 if (cmsWait != kForever) {
301 if (cmsElapsed >= cmsWait)
302 return false;
303 }
304 }
305 return false;
306}
307
308void MessageQueue::ReceiveSends() {
309}
310
Peter Boström0c4e06b2015-10-07 12:23:21 +0200311void MessageQueue::Post(MessageHandler* phandler,
312 uint32_t id,
313 MessageData* pdata,
314 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000315 if (fStop_)
316 return;
317
318 // Keep thread safe
319 // Add the message to the end of the queue
320 // Signal for the multiplexer to return
321
jbauch9ccedc32016-02-25 01:14:56 -0800322 {
323 CritScope cs(&crit_);
324 Message msg;
325 msg.phandler = phandler;
326 msg.message_id = id;
327 msg.pdata = pdata;
328 if (time_sensitive) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700329 msg.ts_sensitive = TimeMillis() + kMaxMsgLatency;
jbauch9ccedc32016-02-25 01:14:56 -0800330 }
331 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000332 }
jbauch9ccedc32016-02-25 01:14:56 -0800333 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000334}
335
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000336void MessageQueue::PostDelayed(int cmsDelay,
337 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200338 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000339 MessageData* pdata) {
340 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
341}
342
Peter Boström0c4e06b2015-10-07 12:23:21 +0200343void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000344 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200345 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000346 MessageData* pdata) {
Honghai Zhang82d78622016-05-06 11:29:15 -0700347 // This should work even if it is used (unexpectedly).
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700348 int64_t delay = static_cast<uint32_t>(TimeMillis()) - tstamp;
Honghai Zhang82d78622016-05-06 11:29:15 -0700349 return DoDelayPost(delay, tstamp, phandler, id, pdata);
350}
351
352void MessageQueue::PostAt(int64_t tstamp,
353 MessageHandler* phandler,
354 uint32_t id,
355 MessageData* pdata) {
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
357}
358
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700359void MessageQueue::DoDelayPost(int64_t cmsDelay,
Honghai Zhang82d78622016-05-06 11:29:15 -0700360 int64_t tstamp,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200361 MessageHandler* phandler,
362 uint32_t id,
363 MessageData* pdata) {
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700364 if (fStop_) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000365 return;
Taylor Brandstetter2b3bf6b2016-05-19 14:57:31 -0700366 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000367
368 // Keep thread safe
369 // Add to the priority queue. Gets sorted soonest first.
370 // Signal for the multiplexer to return.
371
jbauch9ccedc32016-02-25 01:14:56 -0800372 {
373 CritScope cs(&crit_);
374 Message msg;
375 msg.phandler = phandler;
376 msg.message_id = id;
377 msg.pdata = pdata;
378 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
379 dmsgq_.push(dmsg);
380 // If this message queue processes 1 message every millisecond for 50 days,
381 // we will wrap this number. Even then, only messages with identical times
382 // will be misordered, and then only briefly. This is probably ok.
383 VERIFY(0 != ++dmsgq_next_num_);
384 }
385 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000386}
387
388int MessageQueue::GetDelay() {
389 CritScope cs(&crit_);
390
391 if (!msgq_.empty())
392 return 0;
393
394 if (!dmsgq_.empty()) {
395 int delay = TimeUntil(dmsgq_.top().msTrigger_);
396 if (delay < 0)
397 delay = 0;
398 return delay;
399 }
400
401 return kForever;
402}
403
Peter Boström0c4e06b2015-10-07 12:23:21 +0200404void MessageQueue::Clear(MessageHandler* phandler,
405 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000406 MessageList* removed) {
407 CritScope cs(&crit_);
408
409 // Remove messages with phandler
410
411 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
412 if (removed) {
413 removed->push_back(msgPeek_);
414 } else {
415 delete msgPeek_.pdata;
416 }
417 fPeekKeep_ = false;
418 }
419
420 // Remove from ordered message queue
421
422 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
423 if (it->Match(phandler, id)) {
424 if (removed) {
425 removed->push_back(*it);
426 } else {
427 delete it->pdata;
428 }
429 it = msgq_.erase(it);
430 } else {
431 ++it;
432 }
433 }
434
435 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000436
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000437 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
438 for (PriorityQueue::container_type::iterator it = new_end;
439 it != dmsgq_.container().end(); ++it) {
440 if (it->msg_.Match(phandler, id)) {
441 if (removed) {
442 removed->push_back(it->msg_);
443 } else {
444 delete it->msg_.pdata;
445 }
446 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000447 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000448 }
449 }
450 dmsgq_.container().erase(new_end, dmsgq_.container().end());
451 dmsgq_.reheap();
452}
453
454void MessageQueue::Dispatch(Message *pmsg) {
pbos79e28422016-04-29 08:48:05 -0700455 TRACE_EVENT0("webrtc", "MessageQueue::Dispatch");
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000456 pmsg->phandler->OnMessage(pmsg);
457}
458
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000459} // namespace rtc