blob: bf31a05960ca121029f91ed2bf7991a5bd5cdad0 [file] [log] [blame]
henrike@webrtc.orgf0488722014-05-13 18:00:26 +00001/*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#if defined(WEBRTC_POSIX)
12#include <sys/time.h>
13#endif
14
15#include "webrtc/base/common.h"
16#include "webrtc/base/logging.h"
17#include "webrtc/base/messagequeue.h"
18#if defined(__native_client__)
19#include "webrtc/base/nullsocketserver.h"
20typedef rtc::NullSocketServer DefaultSocketServer;
21#else
22#include "webrtc/base/physicalsocketserver.h"
23typedef rtc::PhysicalSocketServer DefaultSocketServer;
24#endif
25
26namespace rtc {
27
28const uint32 kMaxMsgLatency = 150; // 150 ms
29
30//------------------------------------------------------------------
31// MessageQueueManager
32
33MessageQueueManager* MessageQueueManager::instance_ = NULL;
34
35MessageQueueManager* MessageQueueManager::Instance() {
36 // Note: This is not thread safe, but it is first called before threads are
37 // spawned.
38 if (!instance_)
39 instance_ = new MessageQueueManager;
40 return instance_;
41}
42
43bool MessageQueueManager::IsInitialized() {
44 return instance_ != NULL;
45}
46
47MessageQueueManager::MessageQueueManager() {
48}
49
50MessageQueueManager::~MessageQueueManager() {
51}
52
53void MessageQueueManager::Add(MessageQueue *message_queue) {
54 return Instance()->AddInternal(message_queue);
55}
56void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
57 // MessageQueueManager methods should be non-reentrant, so we
58 // ASSERT that is the case. If any of these ASSERT, please
59 // contact bpm or jbeda.
60 ASSERT(!crit_.CurrentThreadIsOwner());
61 CritScope cs(&crit_);
62 message_queues_.push_back(message_queue);
63}
64
65void MessageQueueManager::Remove(MessageQueue *message_queue) {
66 // If there isn't a message queue manager instance, then there isn't a queue
67 // to remove.
68 if (!instance_) return;
69 return Instance()->RemoveInternal(message_queue);
70}
71void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
72 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
73 // If this is the last MessageQueue, destroy the manager as well so that
74 // we don't leak this object at program shutdown. As mentioned above, this is
75 // not thread-safe, but this should only happen at program termination (when
76 // the ThreadManager is destroyed, and threads are no longer active).
77 bool destroy = false;
78 {
79 CritScope cs(&crit_);
80 std::vector<MessageQueue *>::iterator iter;
81 iter = std::find(message_queues_.begin(), message_queues_.end(),
82 message_queue);
83 if (iter != message_queues_.end()) {
84 message_queues_.erase(iter);
85 }
86 destroy = message_queues_.empty();
87 }
88 if (destroy) {
89 instance_ = NULL;
90 delete this;
91 }
92}
93
94void MessageQueueManager::Clear(MessageHandler *handler) {
95 // If there isn't a message queue manager instance, then there aren't any
96 // queues to remove this handler from.
97 if (!instance_) return;
98 return Instance()->ClearInternal(handler);
99}
100void MessageQueueManager::ClearInternal(MessageHandler *handler) {
101 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
102 CritScope cs(&crit_);
103 std::vector<MessageQueue *>::iterator iter;
104 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
105 (*iter)->Clear(handler);
106}
107
108//------------------------------------------------------------------
109// MessageQueue
110
111MessageQueue::MessageQueue(SocketServer* ss)
112 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
113 dmsgq_next_num_(0) {
114 if (!ss_) {
115 // Currently, MessageQueue holds a socket server, and is the base class for
116 // Thread. It seems like it makes more sense for Thread to hold the socket
117 // server, and provide it to the MessageQueue, since the Thread controls
118 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
119 // messagequeue_unittest to depend on network libraries... yuck.
120 default_ss_.reset(new DefaultSocketServer());
121 ss_ = default_ss_.get();
122 }
123 ss_->SetMessageQueue(this);
124}
125
126MessageQueue::~MessageQueue() {
127 // The signal is done from here to ensure
128 // that it always gets called when the queue
129 // is going away.
130 SignalQueueDestroyed();
131 if (active_) {
132 MessageQueueManager::Remove(this);
133 Clear(NULL);
134 }
135 if (ss_) {
136 ss_->SetMessageQueue(NULL);
137 }
138}
139
140void MessageQueue::set_socketserver(SocketServer* ss) {
141 ss_ = ss ? ss : default_ss_.get();
142 ss_->SetMessageQueue(this);
143}
144
145void MessageQueue::Quit() {
146 fStop_ = true;
147 ss_->WakeUp();
148}
149
150bool MessageQueue::IsQuitting() {
151 return fStop_;
152}
153
154void MessageQueue::Restart() {
155 fStop_ = false;
156}
157
158bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
159 if (fPeekKeep_) {
160 *pmsg = msgPeek_;
161 return true;
162 }
163 if (!Get(pmsg, cmsWait))
164 return false;
165 msgPeek_ = *pmsg;
166 fPeekKeep_ = true;
167 return true;
168}
169
170bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
171 // Return and clear peek if present
172 // Always return the peek if it exists so there is Peek/Get symmetry
173
174 if (fPeekKeep_) {
175 *pmsg = msgPeek_;
176 fPeekKeep_ = false;
177 return true;
178 }
179
180 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
181
182 int cmsTotal = cmsWait;
183 int cmsElapsed = 0;
184 uint32 msStart = Time();
185 uint32 msCurrent = msStart;
186 while (true) {
187 // Check for sent messages
188 ReceiveSends();
189
190 // Check for posted events
191 int cmsDelayNext = kForever;
192 bool first_pass = true;
193 while (true) {
194 // All queue operations need to be locked, but nothing else in this loop
195 // (specifically handling disposed message) can happen inside the crit.
196 // Otherwise, disposed MessageHandlers will cause deadlocks.
197 {
198 CritScope cs(&crit_);
199 // On the first pass, check for delayed messages that have been
200 // triggered and calculate the next trigger time.
201 if (first_pass) {
202 first_pass = false;
203 while (!dmsgq_.empty()) {
204 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
205 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
206 break;
207 }
208 msgq_.push_back(dmsgq_.top().msg_);
209 dmsgq_.pop();
210 }
211 }
212 // Pull a message off the message queue, if available.
213 if (msgq_.empty()) {
214 break;
215 } else {
216 *pmsg = msgq_.front();
217 msgq_.pop_front();
218 }
219 } // crit_ is released here.
220
221 // Log a warning for time-sensitive messages that we're late to deliver.
222 if (pmsg->ts_sensitive) {
223 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
224 if (delay > 0) {
225 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
226 << (delay + kMaxMsgLatency) << "ms";
227 }
228 }
229 // If this was a dispose message, delete it and skip it.
230 if (MQID_DISPOSE == pmsg->message_id) {
231 ASSERT(NULL == pmsg->phandler);
232 delete pmsg->pdata;
233 *pmsg = Message();
234 continue;
235 }
236 return true;
237 }
238
239 if (fStop_)
240 break;
241
242 // Which is shorter, the delay wait or the asked wait?
243
244 int cmsNext;
245 if (cmsWait == kForever) {
246 cmsNext = cmsDelayNext;
247 } else {
248 cmsNext = _max(0, cmsTotal - cmsElapsed);
249 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
250 cmsNext = cmsDelayNext;
251 }
252
253 // Wait and multiplex in the meantime
254 if (!ss_->Wait(cmsNext, process_io))
255 return false;
256
257 // If the specified timeout expired, return
258
259 msCurrent = Time();
260 cmsElapsed = TimeDiff(msCurrent, msStart);
261 if (cmsWait != kForever) {
262 if (cmsElapsed >= cmsWait)
263 return false;
264 }
265 }
266 return false;
267}
268
269void MessageQueue::ReceiveSends() {
270}
271
272void MessageQueue::Post(MessageHandler *phandler, uint32 id,
273 MessageData *pdata, bool time_sensitive) {
274 if (fStop_)
275 return;
276
277 // Keep thread safe
278 // Add the message to the end of the queue
279 // Signal for the multiplexer to return
280
281 CritScope cs(&crit_);
282 EnsureActive();
283 Message msg;
284 msg.phandler = phandler;
285 msg.message_id = id;
286 msg.pdata = pdata;
287 if (time_sensitive) {
288 msg.ts_sensitive = Time() + kMaxMsgLatency;
289 }
290 msgq_.push_back(msg);
291 ss_->WakeUp();
292}
293
294void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
295 MessageHandler *phandler, uint32 id, MessageData* pdata) {
296 if (fStop_)
297 return;
298
299 // Keep thread safe
300 // Add to the priority queue. Gets sorted soonest first.
301 // Signal for the multiplexer to return.
302
303 CritScope cs(&crit_);
304 EnsureActive();
305 Message msg;
306 msg.phandler = phandler;
307 msg.message_id = id;
308 msg.pdata = pdata;
309 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
310 dmsgq_.push(dmsg);
311 // If this message queue processes 1 message every millisecond for 50 days,
312 // we will wrap this number. Even then, only messages with identical times
313 // will be misordered, and then only briefly. This is probably ok.
314 VERIFY(0 != ++dmsgq_next_num_);
315 ss_->WakeUp();
316}
317
318int MessageQueue::GetDelay() {
319 CritScope cs(&crit_);
320
321 if (!msgq_.empty())
322 return 0;
323
324 if (!dmsgq_.empty()) {
325 int delay = TimeUntil(dmsgq_.top().msTrigger_);
326 if (delay < 0)
327 delay = 0;
328 return delay;
329 }
330
331 return kForever;
332}
333
334void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
335 MessageList* removed) {
336 CritScope cs(&crit_);
337
338 // Remove messages with phandler
339
340 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
341 if (removed) {
342 removed->push_back(msgPeek_);
343 } else {
344 delete msgPeek_.pdata;
345 }
346 fPeekKeep_ = false;
347 }
348
349 // Remove from ordered message queue
350
351 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
352 if (it->Match(phandler, id)) {
353 if (removed) {
354 removed->push_back(*it);
355 } else {
356 delete it->pdata;
357 }
358 it = msgq_.erase(it);
359 } else {
360 ++it;
361 }
362 }
363
364 // Remove from priority queue. Not directly iterable, so use this approach
365
366 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
367 for (PriorityQueue::container_type::iterator it = new_end;
368 it != dmsgq_.container().end(); ++it) {
369 if (it->msg_.Match(phandler, id)) {
370 if (removed) {
371 removed->push_back(it->msg_);
372 } else {
373 delete it->msg_.pdata;
374 }
375 } else {
376 *new_end++ = *it;
377 }
378 }
379 dmsgq_.container().erase(new_end, dmsgq_.container().end());
380 dmsgq_.reheap();
381}
382
383void MessageQueue::Dispatch(Message *pmsg) {
384 pmsg->phandler->OnMessage(pmsg);
385}
386
387void MessageQueue::EnsureActive() {
388 ASSERT(crit_.CurrentThreadIsOwner());
389 if (!active_) {
390 active_ = true;
391 MessageQueueManager::Add(this);
392 }
393}
394
395} // namespace rtc