blob: 61aa61192bbad69d87ddf5e79aab3a5af3ce290f [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000015#include <algorithm>
16
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017#include "webrtc/base/common.h"
18#include "webrtc/base/logging.h"
19#include "webrtc/base/messagequeue.h"
20#if defined(__native_client__)
21#include "webrtc/base/nullsocketserver.h"
22typedef rtc::NullSocketServer DefaultSocketServer;
23#else
24#include "webrtc/base/physicalsocketserver.h"
25typedef rtc::PhysicalSocketServer DefaultSocketServer;
26#endif
27
28namespace rtc {
29
Peter Boström0c4e06b2015-10-07 12:23:21 +020030const uint32_t kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000031
32//------------------------------------------------------------------
33// MessageQueueManager
34
35MessageQueueManager* MessageQueueManager::instance_ = NULL;
36
37MessageQueueManager* MessageQueueManager::Instance() {
38 // Note: This is not thread safe, but it is first called before threads are
39 // spawned.
40 if (!instance_)
41 instance_ = new MessageQueueManager;
42 return instance_;
43}
44
45bool MessageQueueManager::IsInitialized() {
46 return instance_ != NULL;
47}
48
49MessageQueueManager::MessageQueueManager() {
50}
51
52MessageQueueManager::~MessageQueueManager() {
53}
54
55void MessageQueueManager::Add(MessageQueue *message_queue) {
56 return Instance()->AddInternal(message_queue);
57}
58void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
59 // MessageQueueManager methods should be non-reentrant, so we
60 // ASSERT that is the case. If any of these ASSERT, please
61 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020062#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000064#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000065 CritScope cs(&crit_);
66 message_queues_.push_back(message_queue);
67}
68
69void MessageQueueManager::Remove(MessageQueue *message_queue) {
70 // If there isn't a message queue manager instance, then there isn't a queue
71 // to remove.
72 if (!instance_) return;
73 return Instance()->RemoveInternal(message_queue);
74}
75void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020076#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000078#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000079 // If this is the last MessageQueue, destroy the manager as well so that
80 // we don't leak this object at program shutdown. As mentioned above, this is
81 // not thread-safe, but this should only happen at program termination (when
82 // the ThreadManager is destroyed, and threads are no longer active).
83 bool destroy = false;
84 {
85 CritScope cs(&crit_);
86 std::vector<MessageQueue *>::iterator iter;
87 iter = std::find(message_queues_.begin(), message_queues_.end(),
88 message_queue);
89 if (iter != message_queues_.end()) {
90 message_queues_.erase(iter);
91 }
92 destroy = message_queues_.empty();
93 }
94 if (destroy) {
95 instance_ = NULL;
96 delete this;
97 }
98}
99
100void MessageQueueManager::Clear(MessageHandler *handler) {
101 // If there isn't a message queue manager instance, then there aren't any
102 // queues to remove this handler from.
103 if (!instance_) return;
104 return Instance()->ClearInternal(handler);
105}
106void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +0200107#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000108 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000109#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000110 CritScope cs(&crit_);
111 std::vector<MessageQueue *>::iterator iter;
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113 (*iter)->Clear(handler);
114}
115
116//------------------------------------------------------------------
117// MessageQueue
118
jbauch25d1f282016-02-05 00:25:02 -0800119MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9ccedc32016-02-25 01:14:56 -0800120 : fStop_(false), fPeekKeep_(false),
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000122 if (!ss_) {
123 // Currently, MessageQueue holds a socket server, and is the base class for
124 // Thread. It seems like it makes more sense for Thread to hold the socket
125 // server, and provide it to the MessageQueue, since the Thread controls
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
127 // messagequeue_unittest to depend on network libraries... yuck.
128 default_ss_.reset(new DefaultSocketServer());
129 ss_ = default_ss_.get();
130 }
131 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800132 if (init_queue) {
133 DoInit();
134 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000135}
136
137MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800138 DoDestroy();
139}
140
141void MessageQueue::DoInit() {
142 if (fInitialized_) {
143 return;
144 }
145
146 fInitialized_ = true;
147 MessageQueueManager::Add(this);
148}
149
150void MessageQueue::DoDestroy() {
151 if (fDestroyed_) {
152 return;
153 }
154
155 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000156 // The signal is done from here to ensure
157 // that it always gets called when the queue
158 // is going away.
159 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000160 MessageQueueManager::Remove(this);
161 Clear(NULL);
jbauch9ccedc32016-02-25 01:14:56 -0800162
163 SharedScope ss(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000164 if (ss_) {
165 ss_->SetMessageQueue(NULL);
166 }
167}
168
jbauch9ccedc32016-02-25 01:14:56 -0800169SocketServer* MessageQueue::socketserver() {
170 SharedScope ss(&ss_lock_);
171 return ss_;
172}
173
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000174void MessageQueue::set_socketserver(SocketServer* ss) {
jbauch9ccedc32016-02-25 01:14:56 -0800175 // Need to lock exclusively here to prevent simultaneous modifications from
176 // other threads. Can't be a shared lock to prevent races with other reading
177 // threads.
178 // Other places that only read "ss_" can use a shared lock as simultaneous
179 // read access is allowed.
180 ExclusiveScope es(&ss_lock_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000181 ss_ = ss ? ss : default_ss_.get();
182 ss_->SetMessageQueue(this);
183}
184
jbauch9ccedc32016-02-25 01:14:56 -0800185void MessageQueue::WakeUpSocketServer() {
186 SharedScope ss(&ss_lock_);
187 ss_->WakeUp();
188}
189
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000190void MessageQueue::Quit() {
191 fStop_ = true;
jbauch9ccedc32016-02-25 01:14:56 -0800192 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000193}
194
195bool MessageQueue::IsQuitting() {
196 return fStop_;
197}
198
199void MessageQueue::Restart() {
200 fStop_ = false;
201}
202
203bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
204 if (fPeekKeep_) {
205 *pmsg = msgPeek_;
206 return true;
207 }
208 if (!Get(pmsg, cmsWait))
209 return false;
210 msgPeek_ = *pmsg;
211 fPeekKeep_ = true;
212 return true;
213}
214
215bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
216 // Return and clear peek if present
217 // Always return the peek if it exists so there is Peek/Get symmetry
218
219 if (fPeekKeep_) {
220 *pmsg = msgPeek_;
221 fPeekKeep_ = false;
222 return true;
223 }
224
225 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
226
227 int cmsTotal = cmsWait;
228 int cmsElapsed = 0;
Peter Boström0c4e06b2015-10-07 12:23:21 +0200229 uint32_t msStart = Time();
230 uint32_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000231 while (true) {
232 // Check for sent messages
233 ReceiveSends();
234
235 // Check for posted events
236 int cmsDelayNext = kForever;
237 bool first_pass = true;
238 while (true) {
239 // All queue operations need to be locked, but nothing else in this loop
240 // (specifically handling disposed message) can happen inside the crit.
241 // Otherwise, disposed MessageHandlers will cause deadlocks.
242 {
243 CritScope cs(&crit_);
244 // On the first pass, check for delayed messages that have been
245 // triggered and calculate the next trigger time.
246 if (first_pass) {
247 first_pass = false;
248 while (!dmsgq_.empty()) {
249 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
250 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
251 break;
252 }
253 msgq_.push_back(dmsgq_.top().msg_);
254 dmsgq_.pop();
255 }
256 }
257 // Pull a message off the message queue, if available.
258 if (msgq_.empty()) {
259 break;
260 } else {
261 *pmsg = msgq_.front();
262 msgq_.pop_front();
263 }
264 } // crit_ is released here.
265
266 // Log a warning for time-sensitive messages that we're late to deliver.
267 if (pmsg->ts_sensitive) {
Peter Boström0c4e06b2015-10-07 12:23:21 +0200268 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000269 if (delay > 0) {
270 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
271 << (delay + kMaxMsgLatency) << "ms";
272 }
273 }
274 // If this was a dispose message, delete it and skip it.
275 if (MQID_DISPOSE == pmsg->message_id) {
276 ASSERT(NULL == pmsg->phandler);
277 delete pmsg->pdata;
278 *pmsg = Message();
279 continue;
280 }
281 return true;
282 }
283
284 if (fStop_)
285 break;
286
287 // Which is shorter, the delay wait or the asked wait?
288
289 int cmsNext;
290 if (cmsWait == kForever) {
291 cmsNext = cmsDelayNext;
292 } else {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000293 cmsNext = std::max(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000294 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
295 cmsNext = cmsDelayNext;
296 }
297
jbauch9ccedc32016-02-25 01:14:56 -0800298 {
299 // Wait and multiplex in the meantime
300 SharedScope ss(&ss_lock_);
301 if (!ss_->Wait(cmsNext, process_io))
302 return false;
303 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000304
305 // If the specified timeout expired, return
306
307 msCurrent = Time();
308 cmsElapsed = TimeDiff(msCurrent, msStart);
309 if (cmsWait != kForever) {
310 if (cmsElapsed >= cmsWait)
311 return false;
312 }
313 }
314 return false;
315}
316
317void MessageQueue::ReceiveSends() {
318}
319
Peter Boström0c4e06b2015-10-07 12:23:21 +0200320void MessageQueue::Post(MessageHandler* phandler,
321 uint32_t id,
322 MessageData* pdata,
323 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000324 if (fStop_)
325 return;
326
327 // Keep thread safe
328 // Add the message to the end of the queue
329 // Signal for the multiplexer to return
330
jbauch9ccedc32016-02-25 01:14:56 -0800331 {
332 CritScope cs(&crit_);
333 Message msg;
334 msg.phandler = phandler;
335 msg.message_id = id;
336 msg.pdata = pdata;
337 if (time_sensitive) {
338 msg.ts_sensitive = Time() + kMaxMsgLatency;
339 }
340 msgq_.push_back(msg);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000341 }
jbauch9ccedc32016-02-25 01:14:56 -0800342 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000343}
344
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000345void MessageQueue::PostDelayed(int cmsDelay,
346 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200347 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000348 MessageData* pdata) {
349 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
350}
351
Peter Boström0c4e06b2015-10-07 12:23:21 +0200352void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000353 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200354 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000355 MessageData* pdata) {
356 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
357}
358
Peter Boström0c4e06b2015-10-07 12:23:21 +0200359void MessageQueue::DoDelayPost(int cmsDelay,
360 uint32_t tstamp,
361 MessageHandler* phandler,
362 uint32_t id,
363 MessageData* pdata) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000364 if (fStop_)
365 return;
366
367 // Keep thread safe
368 // Add to the priority queue. Gets sorted soonest first.
369 // Signal for the multiplexer to return.
370
jbauch9ccedc32016-02-25 01:14:56 -0800371 {
372 CritScope cs(&crit_);
373 Message msg;
374 msg.phandler = phandler;
375 msg.message_id = id;
376 msg.pdata = pdata;
377 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
378 dmsgq_.push(dmsg);
379 // If this message queue processes 1 message every millisecond for 50 days,
380 // we will wrap this number. Even then, only messages with identical times
381 // will be misordered, and then only briefly. This is probably ok.
382 VERIFY(0 != ++dmsgq_next_num_);
383 }
384 WakeUpSocketServer();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000385}
386
387int MessageQueue::GetDelay() {
388 CritScope cs(&crit_);
389
390 if (!msgq_.empty())
391 return 0;
392
393 if (!dmsgq_.empty()) {
394 int delay = TimeUntil(dmsgq_.top().msTrigger_);
395 if (delay < 0)
396 delay = 0;
397 return delay;
398 }
399
400 return kForever;
401}
402
Peter Boström0c4e06b2015-10-07 12:23:21 +0200403void MessageQueue::Clear(MessageHandler* phandler,
404 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000405 MessageList* removed) {
406 CritScope cs(&crit_);
407
408 // Remove messages with phandler
409
410 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
411 if (removed) {
412 removed->push_back(msgPeek_);
413 } else {
414 delete msgPeek_.pdata;
415 }
416 fPeekKeep_ = false;
417 }
418
419 // Remove from ordered message queue
420
421 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
422 if (it->Match(phandler, id)) {
423 if (removed) {
424 removed->push_back(*it);
425 } else {
426 delete it->pdata;
427 }
428 it = msgq_.erase(it);
429 } else {
430 ++it;
431 }
432 }
433
434 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000435
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000436 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
437 for (PriorityQueue::container_type::iterator it = new_end;
438 it != dmsgq_.container().end(); ++it) {
439 if (it->msg_.Match(phandler, id)) {
440 if (removed) {
441 removed->push_back(it->msg_);
442 } else {
443 delete it->msg_.pdata;
444 }
445 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000446 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000447 }
448 }
449 dmsgq_.container().erase(new_end, dmsgq_.container().end());
450 dmsgq_.reheap();
451}
452
453void MessageQueue::Dispatch(Message *pmsg) {
454 pmsg->phandler->OnMessage(pmsg);
455}
456
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000457} // namespace rtc