blob: 1b312ff7afbf2613c073e909c1cac271420c21fa [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
15#include "webrtc/base/common.h"
16#include "webrtc/base/logging.h"
17#include "webrtc/base/messagequeue.h"
18#if defined(__native_client__)
19#include "webrtc/base/nullsocketserver.h"
20typedef rtc::NullSocketServer DefaultSocketServer;
21#else
22#include "webrtc/base/physicalsocketserver.h"
23typedef rtc::PhysicalSocketServer DefaultSocketServer;
24#endif
25
26namespace rtc {
27
28const uint32 kMaxMsgLatency = 150; // 150 ms
29
30//------------------------------------------------------------------
31// MessageQueueManager
32
33MessageQueueManager* MessageQueueManager::instance_ = NULL;
34
35MessageQueueManager* MessageQueueManager::Instance() {
36 // Note: This is not thread safe, but it is first called before threads are
37 // spawned.
38 if (!instance_)
39 instance_ = new MessageQueueManager;
40 return instance_;
41}
42
43bool MessageQueueManager::IsInitialized() {
44 return instance_ != NULL;
45}
46
47MessageQueueManager::MessageQueueManager() {
48}
49
50MessageQueueManager::~MessageQueueManager() {
51}
52
53void MessageQueueManager::Add(MessageQueue *message_queue) {
54 return Instance()->AddInternal(message_queue);
55}
56void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
57 // MessageQueueManager methods should be non-reentrant, so we
58 // ASSERT that is the case. If any of these ASSERT, please
59 // contact bpm or jbeda.
60 ASSERT(!crit_.CurrentThreadIsOwner());
61 CritScope cs(&crit_);
62 message_queues_.push_back(message_queue);
63}
64
65void MessageQueueManager::Remove(MessageQueue *message_queue) {
66 // If there isn't a message queue manager instance, then there isn't a queue
67 // to remove.
68 if (!instance_) return;
69 return Instance()->RemoveInternal(message_queue);
70}
71void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
72 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
73 // If this is the last MessageQueue, destroy the manager as well so that
74 // we don't leak this object at program shutdown. As mentioned above, this is
75 // not thread-safe, but this should only happen at program termination (when
76 // the ThreadManager is destroyed, and threads are no longer active).
77 bool destroy = false;
78 {
79 CritScope cs(&crit_);
80 std::vector<MessageQueue *>::iterator iter;
81 iter = std::find(message_queues_.begin(), message_queues_.end(),
82 message_queue);
83 if (iter != message_queues_.end()) {
84 message_queues_.erase(iter);
85 }
86 destroy = message_queues_.empty();
87 }
88 if (destroy) {
89 instance_ = NULL;
90 delete this;
91 }
92}
93
94void MessageQueueManager::Clear(MessageHandler *handler) {
95 // If there isn't a message queue manager instance, then there aren't any
96 // queues to remove this handler from.
97 if (!instance_) return;
98 return Instance()->ClearInternal(handler);
99}
100void MessageQueueManager::ClearInternal(MessageHandler *handler) {
101 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
102 CritScope cs(&crit_);
103 std::vector<MessageQueue *>::iterator iter;
104 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
105 (*iter)->Clear(handler);
106}
107
108//------------------------------------------------------------------
109// MessageQueue
110
111MessageQueue::MessageQueue(SocketServer* ss)
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000112 : ss_(ss), fStop_(false), fPeekKeep_(false),
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000113 dmsgq_next_num_(0) {
114 if (!ss_) {
115 // Currently, MessageQueue holds a socket server, and is the base class for
116 // Thread. It seems like it makes more sense for Thread to hold the socket
117 // server, and provide it to the MessageQueue, since the Thread controls
118 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
119 // messagequeue_unittest to depend on network libraries... yuck.
120 default_ss_.reset(new DefaultSocketServer());
121 ss_ = default_ss_.get();
122 }
123 ss_->SetMessageQueue(this);
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000124 MessageQueueManager::Add(this);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000125}
126
127MessageQueue::~MessageQueue() {
128 // The signal is done from here to ensure
129 // that it always gets called when the queue
130 // is going away.
131 SignalQueueDestroyed();
henrike@webrtc.org99b41622014-05-21 20:42:17 +0000132 MessageQueueManager::Remove(this);
133 Clear(NULL);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000134 if (ss_) {
135 ss_->SetMessageQueue(NULL);
136 }
137}
138
139void MessageQueue::set_socketserver(SocketServer* ss) {
140 ss_ = ss ? ss : default_ss_.get();
141 ss_->SetMessageQueue(this);
142}
143
144void MessageQueue::Quit() {
145 fStop_ = true;
146 ss_->WakeUp();
147}
148
149bool MessageQueue::IsQuitting() {
150 return fStop_;
151}
152
153void MessageQueue::Restart() {
154 fStop_ = false;
155}
156
157bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
158 if (fPeekKeep_) {
159 *pmsg = msgPeek_;
160 return true;
161 }
162 if (!Get(pmsg, cmsWait))
163 return false;
164 msgPeek_ = *pmsg;
165 fPeekKeep_ = true;
166 return true;
167}
168
169bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
170 // Return and clear peek if present
171 // Always return the peek if it exists so there is Peek/Get symmetry
172
173 if (fPeekKeep_) {
174 *pmsg = msgPeek_;
175 fPeekKeep_ = false;
176 return true;
177 }
178
179 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
180
181 int cmsTotal = cmsWait;
182 int cmsElapsed = 0;
183 uint32 msStart = Time();
184 uint32 msCurrent = msStart;
185 while (true) {
186 // Check for sent messages
187 ReceiveSends();
188
189 // Check for posted events
190 int cmsDelayNext = kForever;
191 bool first_pass = true;
192 while (true) {
193 // All queue operations need to be locked, but nothing else in this loop
194 // (specifically handling disposed message) can happen inside the crit.
195 // Otherwise, disposed MessageHandlers will cause deadlocks.
196 {
197 CritScope cs(&crit_);
198 // On the first pass, check for delayed messages that have been
199 // triggered and calculate the next trigger time.
200 if (first_pass) {
201 first_pass = false;
202 while (!dmsgq_.empty()) {
203 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
204 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
205 break;
206 }
207 msgq_.push_back(dmsgq_.top().msg_);
208 dmsgq_.pop();
209 }
210 }
211 // Pull a message off the message queue, if available.
212 if (msgq_.empty()) {
213 break;
214 } else {
215 *pmsg = msgq_.front();
216 msgq_.pop_front();
217 }
218 } // crit_ is released here.
219
220 // Log a warning for time-sensitive messages that we're late to deliver.
221 if (pmsg->ts_sensitive) {
222 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
223 if (delay > 0) {
224 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
225 << (delay + kMaxMsgLatency) << "ms";
226 }
227 }
228 // If this was a dispose message, delete it and skip it.
229 if (MQID_DISPOSE == pmsg->message_id) {
230 ASSERT(NULL == pmsg->phandler);
231 delete pmsg->pdata;
232 *pmsg = Message();
233 continue;
234 }
235 return true;
236 }
237
238 if (fStop_)
239 break;
240
241 // Which is shorter, the delay wait or the asked wait?
242
243 int cmsNext;
244 if (cmsWait == kForever) {
245 cmsNext = cmsDelayNext;
246 } else {
247 cmsNext = _max(0, cmsTotal - cmsElapsed);
248 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
249 cmsNext = cmsDelayNext;
250 }
251
252 // Wait and multiplex in the meantime
253 if (!ss_->Wait(cmsNext, process_io))
254 return false;
255
256 // If the specified timeout expired, return
257
258 msCurrent = Time();
259 cmsElapsed = TimeDiff(msCurrent, msStart);
260 if (cmsWait != kForever) {
261 if (cmsElapsed >= cmsWait)
262 return false;
263 }
264 }
265 return false;
266}
267
268void MessageQueue::ReceiveSends() {
269}
270
271void MessageQueue::Post(MessageHandler *phandler, uint32 id,
272 MessageData *pdata, bool time_sensitive) {
273 if (fStop_)
274 return;
275
276 // Keep thread safe
277 // Add the message to the end of the queue
278 // Signal for the multiplexer to return
279
280 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000281 Message msg;
282 msg.phandler = phandler;
283 msg.message_id = id;
284 msg.pdata = pdata;
285 if (time_sensitive) {
286 msg.ts_sensitive = Time() + kMaxMsgLatency;
287 }
288 msgq_.push_back(msg);
289 ss_->WakeUp();
290}
291
292void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
293 MessageHandler *phandler, uint32 id, MessageData* pdata) {
294 if (fStop_)
295 return;
296
297 // Keep thread safe
298 // Add to the priority queue. Gets sorted soonest first.
299 // Signal for the multiplexer to return.
300
301 CritScope cs(&crit_);
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000302 Message msg;
303 msg.phandler = phandler;
304 msg.message_id = id;
305 msg.pdata = pdata;
306 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
307 dmsgq_.push(dmsg);
308 // If this message queue processes 1 message every millisecond for 50 days,
309 // we will wrap this number. Even then, only messages with identical times
310 // will be misordered, and then only briefly. This is probably ok.
311 VERIFY(0 != ++dmsgq_next_num_);
312 ss_->WakeUp();
313}
314
315int MessageQueue::GetDelay() {
316 CritScope cs(&crit_);
317
318 if (!msgq_.empty())
319 return 0;
320
321 if (!dmsgq_.empty()) {
322 int delay = TimeUntil(dmsgq_.top().msTrigger_);
323 if (delay < 0)
324 delay = 0;
325 return delay;
326 }
327
328 return kForever;
329}
330
331void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
332 MessageList* removed) {
333 CritScope cs(&crit_);
334
335 // Remove messages with phandler
336
337 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
338 if (removed) {
339 removed->push_back(msgPeek_);
340 } else {
341 delete msgPeek_.pdata;
342 }
343 fPeekKeep_ = false;
344 }
345
346 // Remove from ordered message queue
347
348 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
349 if (it->Match(phandler, id)) {
350 if (removed) {
351 removed->push_back(*it);
352 } else {
353 delete it->pdata;
354 }
355 it = msgq_.erase(it);
356 } else {
357 ++it;
358 }
359 }
360
361 // Remove from priority queue. Not directly iterable, so use this approach
362
363 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
364 for (PriorityQueue::container_type::iterator it = new_end;
365 it != dmsgq_.container().end(); ++it) {
366 if (it->msg_.Match(phandler, id)) {
367 if (removed) {
368 removed->push_back(it->msg_);
369 } else {
370 delete it->msg_.pdata;
371 }
372 } else {
373 *new_end++ = *it;
374 }
375 }
376 dmsgq_.container().erase(new_end, dmsgq_.container().end());
377 dmsgq_.reheap();
378}
379
380void MessageQueue::Dispatch(Message *pmsg) {
381 pmsg->phandler->OnMessage(pmsg);
382}
383
henrike@webrtc.orgf0488722014-05-13 18:00:26 +0000384} // namespace rtc