blob: 5cf448e3bdb2dc725fee5c28549acb57320beca6 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
andresp@webrtc.orgff689be2015-02-12 11:54:26 +000015#include <algorithm>
16
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000017#include "webrtc/base/common.h"
18#include "webrtc/base/logging.h"
19#include "webrtc/base/messagequeue.h"
20#if defined(__native_client__)
21#include "webrtc/base/nullsocketserver.h"
22typedef rtc::NullSocketServer DefaultSocketServer;
23#else
24#include "webrtc/base/physicalsocketserver.h"
25typedef rtc::PhysicalSocketServer DefaultSocketServer;
26#endif
27
28namespace rtc {
29
30const uint32 kMaxMsgLatency = 150; // 150 ms
31
32//------------------------------------------------------------------
33// MessageQueueManager
34
35MessageQueueManager* MessageQueueManager::instance_ = NULL;
36
37MessageQueueManager* MessageQueueManager::Instance() {
38 // Note: This is not thread safe, but it is first called before threads are
39 // spawned.
40 if (!instance_)
41 instance_ = new MessageQueueManager;
42 return instance_;
43}
44
45bool MessageQueueManager::IsInitialized() {
46 return instance_ != NULL;
47}
48
49MessageQueueManager::MessageQueueManager() {
50}
51
52MessageQueueManager::~MessageQueueManager() {
53}
54
55void MessageQueueManager::Add(MessageQueue *message_queue) {
56 return Instance()->AddInternal(message_queue);
57}
58void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
59 // MessageQueueManager methods should be non-reentrant, so we
60 // ASSERT that is the case. If any of these ASSERT, please
61 // contact bpm or jbeda.
Tommi494f2092015-04-27 17:39:23 +020062#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000063 ASSERT(!crit_.CurrentThreadIsOwner());
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000064#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000065 CritScope cs(&crit_);
66 message_queues_.push_back(message_queue);
67}
68
69void MessageQueueManager::Remove(MessageQueue *message_queue) {
70 // If there isn't a message queue manager instance, then there isn't a queue
71 // to remove.
72 if (!instance_) return;
73 return Instance()->RemoveInternal(message_queue);
74}
75void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
Tommi494f2092015-04-27 17:39:23 +020076#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000077 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +000078#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +000079 // If this is the last MessageQueue, destroy the manager as well so that
80 // we don't leak this object at program shutdown. As mentioned above, this is
81 // not thread-safe, but this should only happen at program termination (when
82 // the ThreadManager is destroyed, and threads are no longer active).
83 bool destroy = false;
84 {
85 CritScope cs(&crit_);
86 std::vector<MessageQueue *>::iterator iter;
87 iter = std::find(message_queues_.begin(), message_queues_.end(),
88 message_queue);
89 if (iter != message_queues_.end()) {
90 message_queues_.erase(iter);
91 }
92 destroy = message_queues_.empty();
93 }
94 if (destroy) {
95 instance_ = NULL;
96 delete this;
97 }
98}
99
100void MessageQueueManager::Clear(MessageHandler *handler) {
101 // If there isn't a message queue manager instance, then there aren't any
102 // queues to remove this handler from.
103 if (!instance_) return;
104 return Instance()->ClearInternal(handler);
105}
106void MessageQueueManager::ClearInternal(MessageHandler *handler) {
Tommi494f2092015-04-27 17:39:23 +0200107#if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default.
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000108 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
tommi@webrtc.org679d2f12015-03-07 20:14:50 +0000109#endif
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000110 CritScope cs(&crit_);
111 std::vector<MessageQueue *>::iterator iter;
112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
113 (*iter)->Clear(handler);
114}
115
116//------------------------------------------------------------------
117// MessageQueue
118
119MessageQueue::MessageQueue(SocketServer* ss)
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000120 : ss_(ss), fStop_(false), fPeekKeep_(false),
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000121 dmsgq_next_num_(0) {
122 if (!ss_) {
123 // Currently, MessageQueue holds a socket server, and is the base class for
124 // Thread. It seems like it makes more sense for Thread to hold the socket
125 // server, and provide it to the MessageQueue, since the Thread controls
126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
127 // messagequeue_unittest to depend on network libraries... yuck.
128 default_ss_.reset(new DefaultSocketServer());
129 ss_ = default_ss_.get();
130 }
131 ss_->SetMessageQueue(this);
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000132 MessageQueueManager::Add(this);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000133}
134
135MessageQueue::~MessageQueue() {
136 // The signal is done from here to ensure
137 // that it always gets called when the queue
138 // is going away.
139 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000140 MessageQueueManager::Remove(this);
141 Clear(NULL);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000142 if (ss_) {
143 ss_->SetMessageQueue(NULL);
144 }
145}
146
147void MessageQueue::set_socketserver(SocketServer* ss) {
148 ss_ = ss ? ss : default_ss_.get();
149 ss_->SetMessageQueue(this);
150}
151
152void MessageQueue::Quit() {
153 fStop_ = true;
154 ss_->WakeUp();
155}
156
157bool MessageQueue::IsQuitting() {
158 return fStop_;
159}
160
161void MessageQueue::Restart() {
162 fStop_ = false;
163}
164
165bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
166 if (fPeekKeep_) {
167 *pmsg = msgPeek_;
168 return true;
169 }
170 if (!Get(pmsg, cmsWait))
171 return false;
172 msgPeek_ = *pmsg;
173 fPeekKeep_ = true;
174 return true;
175}
176
177bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
178 // Return and clear peek if present
179 // Always return the peek if it exists so there is Peek/Get symmetry
180
181 if (fPeekKeep_) {
182 *pmsg = msgPeek_;
183 fPeekKeep_ = false;
184 return true;
185 }
186
187 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
188
189 int cmsTotal = cmsWait;
190 int cmsElapsed = 0;
191 uint32 msStart = Time();
192 uint32 msCurrent = msStart;
193 while (true) {
194 // Check for sent messages
195 ReceiveSends();
196
197 // Check for posted events
198 int cmsDelayNext = kForever;
199 bool first_pass = true;
200 while (true) {
201 // All queue operations need to be locked, but nothing else in this loop
202 // (specifically handling disposed message) can happen inside the crit.
203 // Otherwise, disposed MessageHandlers will cause deadlocks.
204 {
205 CritScope cs(&crit_);
206 // On the first pass, check for delayed messages that have been
207 // triggered and calculate the next trigger time.
208 if (first_pass) {
209 first_pass = false;
210 while (!dmsgq_.empty()) {
211 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
212 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
213 break;
214 }
215 msgq_.push_back(dmsgq_.top().msg_);
216 dmsgq_.pop();
217 }
218 }
219 // Pull a message off the message queue, if available.
220 if (msgq_.empty()) {
221 break;
222 } else {
223 *pmsg = msgq_.front();
224 msgq_.pop_front();
225 }
226 } // crit_ is released here.
227
228 // Log a warning for time-sensitive messages that we're late to deliver.
229 if (pmsg->ts_sensitive) {
230 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
231 if (delay > 0) {
232 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
233 << (delay + kMaxMsgLatency) << "ms";
234 }
235 }
236 // If this was a dispose message, delete it and skip it.
237 if (MQID_DISPOSE == pmsg->message_id) {
238 ASSERT(NULL == pmsg->phandler);
239 delete pmsg->pdata;
240 *pmsg = Message();
241 continue;
242 }
243 return true;
244 }
245
246 if (fStop_)
247 break;
248
249 // Which is shorter, the delay wait or the asked wait?
250
251 int cmsNext;
252 if (cmsWait == kForever) {
253 cmsNext = cmsDelayNext;
254 } else {
andresp@webrtc.orgff689be2015-02-12 11:54:26 +0000255 cmsNext = std::max(0, cmsTotal - cmsElapsed);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000256 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
257 cmsNext = cmsDelayNext;
258 }
259
260 // Wait and multiplex in the meantime
261 if (!ss_->Wait(cmsNext, process_io))
262 return false;
263
264 // If the specified timeout expired, return
265
266 msCurrent = Time();
267 cmsElapsed = TimeDiff(msCurrent, msStart);
268 if (cmsWait != kForever) {
269 if (cmsElapsed >= cmsWait)
270 return false;
271 }
272 }
273 return false;
274}
275
276void MessageQueue::ReceiveSends() {
277}
278
279void MessageQueue::Post(MessageHandler *phandler, uint32 id,
280 MessageData *pdata, bool time_sensitive) {
281 if (fStop_)
282 return;
283
284 // Keep thread safe
285 // Add the message to the end of the queue
286 // Signal for the multiplexer to return
287
288 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000289 Message msg;
290 msg.phandler = phandler;
291 msg.message_id = id;
292 msg.pdata = pdata;
293 if (time_sensitive) {
294 msg.ts_sensitive = Time() + kMaxMsgLatency;
295 }
296 msgq_.push_back(msg);
297 ss_->WakeUp();
298}
299
kwiberg@webrtc.org67186fe2015-03-09 22:21:53 +0000300void MessageQueue::PostDelayed(int cmsDelay,
301 MessageHandler* phandler,
302 uint32 id,
303 MessageData* pdata) {
304 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
305}
306
307void MessageQueue::PostAt(uint32 tstamp,
308 MessageHandler* phandler,
309 uint32 id,
310 MessageData* pdata) {
311 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
312}
313
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000314void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
315 MessageHandler *phandler, uint32 id, MessageData* pdata) {
316 if (fStop_)
317 return;
318
319 // Keep thread safe
320 // Add to the priority queue. Gets sorted soonest first.
321 // Signal for the multiplexer to return.
322
323 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000324 Message msg;
325 msg.phandler = phandler;
326 msg.message_id = id;
327 msg.pdata = pdata;
328 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
329 dmsgq_.push(dmsg);
330 // If this message queue processes 1 message every millisecond for 50 days,
331 // we will wrap this number. Even then, only messages with identical times
332 // will be misordered, and then only briefly. This is probably ok.
333 VERIFY(0 != ++dmsgq_next_num_);
334 ss_->WakeUp();
335}
336
337int MessageQueue::GetDelay() {
338 CritScope cs(&crit_);
339
340 if (!msgq_.empty())
341 return 0;
342
343 if (!dmsgq_.empty()) {
344 int delay = TimeUntil(dmsgq_.top().msTrigger_);
345 if (delay < 0)
346 delay = 0;
347 return delay;
348 }
349
350 return kForever;
351}
352
353void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
354 MessageList* removed) {
355 CritScope cs(&crit_);
356
357 // Remove messages with phandler
358
359 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
360 if (removed) {
361 removed->push_back(msgPeek_);
362 } else {
363 delete msgPeek_.pdata;
364 }
365 fPeekKeep_ = false;
366 }
367
368 // Remove from ordered message queue
369
370 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
371 if (it->Match(phandler, id)) {
372 if (removed) {
373 removed->push_back(*it);
374 } else {
375 delete it->pdata;
376 }
377 it = msgq_.erase(it);
378 } else {
379 ++it;
380 }
381 }
382
383 // Remove from priority queue. Not directly iterable, so use this approach
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000384
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000385 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
386 for (PriorityQueue::container_type::iterator it = new_end;
387 it != dmsgq_.container().end(); ++it) {
388 if (it->msg_.Match(phandler, id)) {
389 if (removed) {
390 removed->push_back(it->msg_);
391 } else {
392 delete it->msg_.pdata;
393 }
394 } else {
decurtis@webrtc.org2af30572015-02-21 01:59:50 +0000395 *new_end++ = *it;
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000396 }
397 }
398 dmsgq_.container().erase(new_end, dmsgq_.container().end());
399 dmsgq_.reheap();
400}
401
402void MessageQueue::Dispatch(Message *pmsg) {
403 pmsg->phandler->OnMessage(pmsg);
404}
405
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000406} // namespace rtc