blob: c435dd889e87e705633fe174889d79f1bf77819e [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000010#include <algorithm>
11
danilchapbebf54c2016-04-28 01:32:48 -070012#include "webrtc/base/checks.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000013#include "webrtc/base/common.h"
14#include "webrtc/base/logging.h"
15#include "webrtc/base/messagequeue.h"
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000016
17namespace rtc {
18
Peter Boström0c4e06b2015-10-07 12:23:21 +020019const uint32_t kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000020
21//------------------------------------------------------------------
22// MessageQueueManager
23
24MessageQueueManager* MessageQueueManager::instance_ = NULL;
25
26MessageQueueManager* MessageQueueManager::Instance() {
27 // Note: This is not thread safe, but it is first called before threads are
28 // spawned.
29 if (!instance_)
30 instance_ = new MessageQueueManager;
31 return instance_;
32}
33
34bool MessageQueueManager::IsInitialized() {
35 return instance_ != NULL;
36}
37
38MessageQueueManager::MessageQueueManager() {
39}
40
41MessageQueueManager::~MessageQueueManager() {
42}
43
44void MessageQueueManager::Add(MessageQueue *message_queue) {
45 return Instance()->AddInternal(message_queue);
46}
47void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
48 // MessageQueueManager methods should be non-reentrant, so we
49 // ASSERT that is the case. If any of these ASSERT, please
50 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020051#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000052 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000053#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000054 CritScope cs(&crit_);
55 message_queues_.push_back(message_queue);
56}
57
58void MessageQueueManager::Remove(MessageQueue *message_queue) {
59 // If there isn't a message queue manager instance, then there isn't a queue
60 // to remove.
61 if (!instance_) return;
62 return Instance()->RemoveInternal(message_queue);
63}
64void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020065#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000066 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000067#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000068 // If this is the last MessageQueue, destroy the manager as well so that
69 // we don't leak this object at program shutdown. As mentioned above, this is
70 // not thread-safe, but this should only happen at program termination (when
71 // the ThreadManager is destroyed, and threads are no longer active).
72 bool destroy = false;
73 {
74 CritScope cs(&crit_);
75 std::vector<MessageQueue *>::iterator iter;
76 iter = std::find(message_queues_.begin(), message_queues_.end(),
77 message_queue);
78 if (iter != message_queues_.end()) {
79 message_queues_.erase(iter);
80 }
81 destroy = message_queues_.empty();
82 }
83 if (destroy) {
84 instance_ = NULL;
85 delete this;
86 }
87}
88
89void MessageQueueManager::Clear(MessageHandler *handler) {
90 // If there isn't a message queue manager instance, then there aren't any
91 // queues to remove this handler from.
92 if (!instance_) return;
93 return Instance()->ClearInternal(handler);
94}
95void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +020096#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000097 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000098#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000099 CritScope cs(&crit_);
100 std::vector<MessageQueue *>::iterator iter;
101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
102 (*iter)->Clear(handler);
103}
104
105//------------------------------------------------------------------
106// MessageQueue
jbauch25d1f282016-02-05 00:25:02 -0800107MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800108 : fStop_(false), fPeekKeep_(false),
109 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
danilchapbebf54c2016-04-28 01:32:48 -0700110 RTC_DCHECK(ss);
111 // Currently, MessageQueue holds a socket server, and is the base class for
112 // Thread. It seems like it makes more sense for Thread to hold the socket
113 // server, and provide it to the MessageQueue, since the Thread controls
114 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
115 // messagequeue_unittest to depend on network libraries... yuck.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000116 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800117 if (init_queue) {
118 DoInit();
119 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000120}
121
danilchapbebf54c2016-04-28 01:32:48 -0700122MessageQueue::MessageQueue(std::unique_ptr<SocketServer> ss, bool init_queue)
123 : MessageQueue(ss.get(), init_queue) {
124 own_ss_ = std::move(ss);
125}
126
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000127MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800128 DoDestroy();
129}
130
131void MessageQueue::DoInit() {
132 if (fInitialized_) {
133 return;
134 }
135
136 fInitialized_ = true;
137 MessageQueueManager::Add(this);
138}
139
140void MessageQueue::DoDestroy() {
141 if (fDestroyed_) {
142 return;
143 }
144
145 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000146 // The signal is done from here to ensure
147 // that it always gets called when the queue
148 // is going away.
149 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000150 MessageQueueManager::Remove(this);
151 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800152
153 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000154 if (ss_) {
155 ss_->SetMessageQueue(NULL);
156 }
157}
158
jbauch9ccedc32016-02-25 01:14:56 -0800159SocketServer* MessageQueue::socketserver() {
160 SharedScope ss(&ss_lock_);
161 return ss_;
162}
163
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000164void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800165 // Need to lock exclusively here to prevent simultaneous modifications from
166 // other threads. Can't be a shared lock to prevent races with other reading
167 // threads.
168 // Other places that only read "ss_" can use a shared lock as simultaneous
169 // read access is allowed.
170 ExclusiveScope es(&ss_lock_);
danilchapbebf54c2016-04-28 01:32:48 -0700171 ss_ = ss ? ss : own_ss_.get();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000172 ss_->SetMessageQueue(this);
173}
174
jbauch9ccedc32016-02-25 01:14:56 -0800175void MessageQueue::WakeUpSocketServer() {
176 SharedScope ss(&ss_lock_);
177 ss_->WakeUp();
178}
179
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000180void MessageQueue::Quit() {
181 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800182 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000183}
184
185bool MessageQueue::IsQuitting() {
186 return fStop_;
187}
188
189void MessageQueue::Restart() {
190 fStop_ = false;
191}
192
193bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
194 if (fPeekKeep_) {
195 *pmsg = msgPeek_;
196 return true;
197 }
198 if (!Get(pmsg, cmsWait))
199 return false;
200 msgPeek_ = *pmsg;
201 fPeekKeep_ = true;
202 return true;
203}
204
205bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
206 // Return and clear peek if present
207 // Always return the peek if it exists so there is Peek/Get symmetry
208
209 if (fPeekKeep_) {
210 *pmsg = msgPeek_;
211 fPeekKeep_ = false;
212 return true;
213 }
214
215 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
216
217 int cmsTotal = cmsWait;
218 int cmsElapsed = 0;
Peter Boström0c4e06b2015-10-07 12:23:21 +0200219 uint32_t msStart = Time();
220 uint32_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000221 while (true) {
222 // Check for sent messages
223 ReceiveSends();
224
225 // Check for posted events
226 int cmsDelayNext = kForever;
227 bool first_pass = true;
228 while (true) {
229 // All queue operations need to be locked, but nothing else in this loop
230 // (specifically handling disposed message) can happen inside the crit.
231 // Otherwise, disposed MessageHandlers will cause deadlocks.
232 {
233 CritScope cs(&crit_);
234 // On the first pass, check for delayed messages that have been
235 // triggered and calculate the next trigger time.
236 if (first_pass) {
237 first_pass = false;
238 while (!dmsgq_.empty()) {
239 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
240 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
241 break;
242 }
243 msgq_.push_back(dmsgq_.top().msg_);
244 dmsgq_.pop();
245 }
246 }
247 // Pull a message off the message queue, if available.
248 if (msgq_.empty()) {
249 break;
250 } else {
251 *pmsg = msgq_.front();
252 msgq_.pop_front();
253 }
254 } // crit_ is released here.
255
256 // Log a warning for time-sensitive messages that we're late to deliver.
257 if (pmsg->ts_sensitive) {
Peter Boström0c4e06b2015-10-07 12:23:21 +0200258 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000259 if (delay > 0) {
260 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
261 << (delay + kMaxMsgLatency) << "ms";
262 }
263 }
264 // If this was a dispose message, delete it and skip it.
265 if (MQID_DISPOSE == pmsg->message_id) {
266 ASSERT(NULL == pmsg->phandler);
267 delete pmsg->pdata;
268 *pmsg = Message();
269 continue;
270 }
271 return true;
272 }
273
274 if (fStop_)
275 break;
276
277 // Which is shorter, the delay wait or the asked wait?
278
279 int cmsNext;
280 if (cmsWait == kForever) {
281 cmsNext = cmsDelayNext;
282 } else {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000283 cmsNext = std::max(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000284 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
285 cmsNext = cmsDelayNext;
286 }
287
jbauch9ccedc32016-02-25 01:14:56 -0800288 {
289 // Wait and multiplex in the meantime
290 SharedScope ss(&ss_lock_);
291 if (!ss_->Wait(cmsNext, process_io))
292 return false;
293 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000294
295 // If the specified timeout expired, return
296
297 msCurrent = Time();
298 cmsElapsed = TimeDiff(msCurrent, msStart);
299 if (cmsWait != kForever) {
300 if (cmsElapsed >= cmsWait)
301 return false;
302 }
303 }
304 return false;
305}
306
307void MessageQueue::ReceiveSends() {
308}
309
Peter Boström0c4e06b2015-10-07 12:23:21 +0200310void MessageQueue::Post(MessageHandler* phandler,
311 uint32_t id,
312 MessageData* pdata,
313 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000314 if (fStop_)
315 return;
316
317 // Keep thread safe
318 // Add the message to the end of the queue
319 // Signal for the multiplexer to return
320
jbauch9ccedc32016-02-25 01:14:56 -0800321 {
322 CritScope cs(&crit_);
323 Message msg;
324 msg.phandler = phandler;
325 msg.message_id = id;
326 msg.pdata = pdata;
327 if (time_sensitive) {
328 msg.ts_sensitive = Time() + kMaxMsgLatency;
329 }
330 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000331 }
jbauch9ccedc32016-02-25 01:14:56 -0800332 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000333}
334
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000335void MessageQueue::PostDelayed(int cmsDelay,
336 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200337 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000338 MessageData* pdata) {
339 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
340}
341
Peter Boström0c4e06b2015-10-07 12:23:21 +0200342void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000343 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200344 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000345 MessageData* pdata) {
346 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
347}
348
Peter Boström0c4e06b2015-10-07 12:23:21 +0200349void MessageQueue::DoDelayPost(int cmsDelay,
350 uint32_t tstamp,
351 MessageHandler* phandler,
352 uint32_t id,
353 MessageData* pdata) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000354 if (fStop_)
355 return;
356
357 // Keep thread safe
358 // Add to the priority queue. Gets sorted soonest first.
359 // Signal for the multiplexer to return.
360
jbauch9ccedc32016-02-25 01:14:56 -0800361 {
362 CritScope cs(&crit_);
363 Message msg;
364 msg.phandler = phandler;
365 msg.message_id = id;
366 msg.pdata = pdata;
367 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
368 dmsgq_.push(dmsg);
369 // If this message queue processes 1 message every millisecond for 50 days,
370 // we will wrap this number. Even then, only messages with identical times
371 // will be misordered, and then only briefly. This is probably ok.
372 VERIFY(0 != ++dmsgq_next_num_);
373 }
374 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000375}
376
377int MessageQueue::GetDelay() {
378 CritScope cs(&crit_);
379
380 if (!msgq_.empty())
381 return 0;
382
383 if (!dmsgq_.empty()) {
384 int delay = TimeUntil(dmsgq_.top().msTrigger_);
385 if (delay < 0)
386 delay = 0;
387 return delay;
388 }
389
390 return kForever;
391}
392
Peter Boström0c4e06b2015-10-07 12:23:21 +0200393void MessageQueue::Clear(MessageHandler* phandler,
394 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000395 MessageList* removed) {
396 CritScope cs(&crit_);
397
398 // Remove messages with phandler
399
400 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
401 if (removed) {
402 removed->push_back(msgPeek_);
403 } else {
404 delete msgPeek_.pdata;
405 }
406 fPeekKeep_ = false;
407 }
408
409 // Remove from ordered message queue
410
411 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
412 if (it->Match(phandler, id)) {
413 if (removed) {
414 removed->push_back(*it);
415 } else {
416 delete it->pdata;
417 }
418 it = msgq_.erase(it);
419 } else {
420 ++it;
421 }
422 }
423
424 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000425
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000426 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
427 for (PriorityQueue::container_type::iterator it = new_end;
428 it != dmsgq_.container().end(); ++it) {
429 if (it->msg_.Match(phandler, id)) {
430 if (removed) {
431 removed->push_back(it->msg_);
432 } else {
433 delete it->msg_.pdata;
434 }
435 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000436 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000437 }
438 }
439 dmsgq_.container().erase(new_end, dmsgq_.container().end());
440 dmsgq_.reheap();
441}
442
443void MessageQueue::Dispatch(Message *pmsg) {
444 pmsg->phandler->OnMessage(pmsg);
445}
446
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000447} // namespace rtc