blob: bbdb941ffab4941107b347023f5ab23e0b762b42 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000015#include <algorithm>
16
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017#include "webrtc/base/common.h"
18#include "webrtc/base/logging.h"
19#include "webrtc/base/messagequeue.h"
20#if defined(__native_client__)
21#include "webrtc/base/nullsocketserver.h"
22typedef rtc::NullSocketServer DefaultSocketServer;
23#else
24#include "webrtc/base/physicalsocketserver.h"
25typedef rtc::PhysicalSocketServer DefaultSocketServer;
26#endif
27
28namespace rtc {
29
Peter Boström0c4e06b2015-10-07 12:23:21 +020030const uint32_t kMaxMsgLatency = 150; // 150 ms
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000031
32//------------------------------------------------------------------
33// MessageQueueManager
34
35MessageQueueManager* MessageQueueManager::instance_ = NULL;
36
37MessageQueueManager* MessageQueueManager::Instance() {
38 // Note: This is not thread safe, but it is first called before threads are
39 // spawned.
40 if (!instance_)
41 instance_ = new MessageQueueManager;
42 return instance_;
43}
44
45bool MessageQueueManager::IsInitialized() {
46 return instance_ != NULL;
47}
48
49MessageQueueManager::MessageQueueManager() {
50}
51
52MessageQueueManager::~MessageQueueManager() {
53}
54
55void MessageQueueManager::Add(MessageQueue *message_queue) {
56 return Instance()->AddInternal(message_queue);
57}
58void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
59 // MessageQueueManager methods should be non-reentrant, so we
60 // ASSERT that is the case. If any of these ASSERT, please
61 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020062#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000064#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000065 CritScope cs(&crit_);
66 message_queues_.push_back(message_queue);
67}
68
69void MessageQueueManager::Remove(MessageQueue *message_queue) {
70 // If there isn't a message queue manager instance, then there isn't a queue
71 // to remove.
72 if (!instance_) return;
73 return Instance()->RemoveInternal(message_queue);
74}
75void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020076#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000078#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000079 // If this is the last MessageQueue, destroy the manager as well so that
80 // we don't leak this object at program shutdown. As mentioned above, this is
81 // not thread-safe, but this should only happen at program termination (when
82 // the ThreadManager is destroyed, and threads are no longer active).
83 bool destroy = false;
84 {
85 CritScope cs(&crit_);
86 std::vector<MessageQueue *>::iterator iter;
87 iter = std::find(message_queues_.begin(), message_queues_.end(),
88 message_queue);
89 if (iter != message_queues_.end()) {
90 message_queues_.erase(iter);
91 }
92 destroy = message_queues_.empty();
93 }
94 if (destroy) {
95 instance_ = NULL;
96 delete this;
97 }
98}
99
100void MessageQueueManager::Clear(MessageHandler *handler) {
101 // If there isn't a message queue manager instance, then there aren't any
102 // queues to remove this handler from.
103 if (!instance_) return;
104 return Instance()->ClearInternal(handler);
105}
106void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +0200107#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000108 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000109#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000110 CritScope cs(&crit_);
111 std::vector<MessageQueue *>::iterator iter;
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113 (*iter)->Clear(handler);
114}
115
116//------------------------------------------------------------------
117// MessageQueue
118
jbauch25d1f282016-02-05 00:25:02 -0800119MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
jbauch9674d7c2016-02-19 07:16:16 -0800120 : ss_(ss), fStop_(false), fPeekKeep_(false),
121 dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000122 if (!ss_) {
123 // Currently, MessageQueue holds a socket server, and is the base class for
124 // Thread. It seems like it makes more sense for Thread to hold the socket
125 // server, and provide it to the MessageQueue, since the Thread controls
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
127 // messagequeue_unittest to depend on network libraries... yuck.
128 default_ss_.reset(new DefaultSocketServer());
129 ss_ = default_ss_.get();
130 }
131 ss_->SetMessageQueue(this);
jbauch25d1f282016-02-05 00:25:02 -0800132 if (init_queue) {
133 DoInit();
134 }
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000135}
136
137MessageQueue::~MessageQueue() {
jbauch25d1f282016-02-05 00:25:02 -0800138 DoDestroy();
139}
140
141void MessageQueue::DoInit() {
142 if (fInitialized_) {
143 return;
144 }
145
146 fInitialized_ = true;
147 MessageQueueManager::Add(this);
148}
149
150void MessageQueue::DoDestroy() {
151 if (fDestroyed_) {
152 return;
153 }
154
155 fDestroyed_ = true;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000156 // The signal is done from here to ensure
157 // that it always gets called when the queue
158 // is going away.
159 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000160 MessageQueueManager::Remove(this);
161 Clear(NULL);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000162 if (ss_) {
163 ss_->SetMessageQueue(NULL);
164 }
165}
166
167void MessageQueue::set_socketserver(SocketServer* ss) {
168 ss_ = ss ? ss : default_ss_.get();
169 ss_->SetMessageQueue(this);
170}
171
172void MessageQueue::Quit() {
173 fStop_ = true;
jbauch9674d7c2016-02-19 07:16:16 -0800174 ss_->WakeUp();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000175}
176
177bool MessageQueue::IsQuitting() {
178 return fStop_;
179}
180
181void MessageQueue::Restart() {
182 fStop_ = false;
183}
184
185bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
186 if (fPeekKeep_) {
187 *pmsg = msgPeek_;
188 return true;
189 }
190 if (!Get(pmsg, cmsWait))
191 return false;
192 msgPeek_ = *pmsg;
193 fPeekKeep_ = true;
194 return true;
195}
196
197bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
198 // Return and clear peek if present
199 // Always return the peek if it exists so there is Peek/Get symmetry
200
201 if (fPeekKeep_) {
202 *pmsg = msgPeek_;
203 fPeekKeep_ = false;
204 return true;
205 }
206
207 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
208
209 int cmsTotal = cmsWait;
210 int cmsElapsed = 0;
Peter Boström0c4e06b2015-10-07 12:23:21 +0200211 uint32_t msStart = Time();
212 uint32_t msCurrent = msStart;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000213 while (true) {
214 // Check for sent messages
215 ReceiveSends();
216
217 // Check for posted events
218 int cmsDelayNext = kForever;
219 bool first_pass = true;
220 while (true) {
221 // All queue operations need to be locked, but nothing else in this loop
222 // (specifically handling disposed message) can happen inside the crit.
223 // Otherwise, disposed MessageHandlers will cause deadlocks.
224 {
225 CritScope cs(&crit_);
226 // On the first pass, check for delayed messages that have been
227 // triggered and calculate the next trigger time.
228 if (first_pass) {
229 first_pass = false;
230 while (!dmsgq_.empty()) {
231 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
232 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
233 break;
234 }
235 msgq_.push_back(dmsgq_.top().msg_);
236 dmsgq_.pop();
237 }
238 }
239 // Pull a message off the message queue, if available.
240 if (msgq_.empty()) {
241 break;
242 } else {
243 *pmsg = msgq_.front();
244 msgq_.pop_front();
245 }
246 } // crit_ is released here.
247
248 // Log a warning for time-sensitive messages that we're late to deliver.
249 if (pmsg->ts_sensitive) {
Peter Boström0c4e06b2015-10-07 12:23:21 +0200250 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000251 if (delay > 0) {
252 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
253 << (delay + kMaxMsgLatency) << "ms";
254 }
255 }
256 // If this was a dispose message, delete it and skip it.
257 if (MQID_DISPOSE == pmsg->message_id) {
258 ASSERT(NULL == pmsg->phandler);
259 delete pmsg->pdata;
260 *pmsg = Message();
261 continue;
262 }
263 return true;
264 }
265
266 if (fStop_)
267 break;
268
269 // Which is shorter, the delay wait or the asked wait?
270
271 int cmsNext;
272 if (cmsWait == kForever) {
273 cmsNext = cmsDelayNext;
274 } else {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000275 cmsNext = std::max(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000276 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
277 cmsNext = cmsDelayNext;
278 }
279
jbauch9674d7c2016-02-19 07:16:16 -0800280 // Wait and multiplex in the meantime
281 if (!ss_->Wait(cmsNext, process_io))
282 return false;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000283
284 // If the specified timeout expired, return
285
286 msCurrent = Time();
287 cmsElapsed = TimeDiff(msCurrent, msStart);
288 if (cmsWait != kForever) {
289 if (cmsElapsed >= cmsWait)
290 return false;
291 }
292 }
293 return false;
294}
295
296void MessageQueue::ReceiveSends() {
297}
298
Peter Boström0c4e06b2015-10-07 12:23:21 +0200299void MessageQueue::Post(MessageHandler* phandler,
300 uint32_t id,
301 MessageData* pdata,
302 bool time_sensitive) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000303 if (fStop_)
304 return;
305
306 // Keep thread safe
307 // Add the message to the end of the queue
308 // Signal for the multiplexer to return
309
jbauch9674d7c2016-02-19 07:16:16 -0800310 CritScope cs(&crit_);
311 Message msg;
312 msg.phandler = phandler;
313 msg.message_id = id;
314 msg.pdata = pdata;
315 if (time_sensitive) {
316 msg.ts_sensitive = Time() + kMaxMsgLatency;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000317 }
jbauch9674d7c2016-02-19 07:16:16 -0800318 msgq_.push_back(msg);
319 ss_->WakeUp();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000320}
321
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000322void MessageQueue::PostDelayed(int cmsDelay,
323 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200324 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000325 MessageData* pdata) {
326 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
327}
328
Peter Boström0c4e06b2015-10-07 12:23:21 +0200329void MessageQueue::PostAt(uint32_t tstamp,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000330 MessageHandler* phandler,
Peter Boström0c4e06b2015-10-07 12:23:21 +0200331 uint32_t id,
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000332 MessageData* pdata) {
333 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
334}
335
Peter Boström0c4e06b2015-10-07 12:23:21 +0200336void MessageQueue::DoDelayPost(int cmsDelay,
337 uint32_t tstamp,
338 MessageHandler* phandler,
339 uint32_t id,
340 MessageData* pdata) {
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000341 if (fStop_)
342 return;
343
344 // Keep thread safe
345 // Add to the priority queue. Gets sorted soonest first.
346 // Signal for the multiplexer to return.
347
jbauch9674d7c2016-02-19 07:16:16 -0800348 CritScope cs(&crit_);
349 Message msg;
350 msg.phandler = phandler;
351 msg.message_id = id;
352 msg.pdata = pdata;
353 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
354 dmsgq_.push(dmsg);
355 // If this message queue processes 1 message every millisecond for 50 days,
356 // we will wrap this number. Even then, only messages with identical times
357 // will be misordered, and then only briefly. This is probably ok.
358 VERIFY(0 != ++dmsgq_next_num_);
359 ss_->WakeUp();
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000360}
361
362int MessageQueue::GetDelay() {
363 CritScope cs(&crit_);
364
365 if (!msgq_.empty())
366 return 0;
367
368 if (!dmsgq_.empty()) {
369 int delay = TimeUntil(dmsgq_.top().msTrigger_);
370 if (delay < 0)
371 delay = 0;
372 return delay;
373 }
374
375 return kForever;
376}
377
Peter Boström0c4e06b2015-10-07 12:23:21 +0200378void MessageQueue::Clear(MessageHandler* phandler,
379 uint32_t id,
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000380 MessageList* removed) {
381 CritScope cs(&crit_);
382
383 // Remove messages with phandler
384
385 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
386 if (removed) {
387 removed->push_back(msgPeek_);
388 } else {
389 delete msgPeek_.pdata;
390 }
391 fPeekKeep_ = false;
392 }
393
394 // Remove from ordered message queue
395
396 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
397 if (it->Match(phandler, id)) {
398 if (removed) {
399 removed->push_back(*it);
400 } else {
401 delete it->pdata;
402 }
403 it = msgq_.erase(it);
404 } else {
405 ++it;
406 }
407 }
408
409 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000410
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000411 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
412 for (PriorityQueue::container_type::iterator it = new_end;
413 it != dmsgq_.container().end(); ++it) {
414 if (it->msg_.Match(phandler, id)) {
415 if (removed) {
416 removed->push_back(it->msg_);
417 } else {
418 delete it->msg_.pdata;
419 }
420 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000421 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000422 }
423 }
424 dmsgq_.container().erase(new_end, dmsgq_.container().end());
425 dmsgq_.reheap();
426}
427
428void MessageQueue::Dispatch(Message *pmsg) {
429 pmsg->phandler->OnMessage(pmsg);
430}
431
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000432} // namespace rtc