blob: 64e63ae3dcbd09d30f5b1c3b005bc4899a2c8468 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifdef POSIX
29#include <sys/time.h>
30#endif
31
32#include "talk/base/common.h"
33#include "talk/base/logging.h"
34#include "talk/base/messagequeue.h"
henrika@webrtc.orgaebb1ad2014-01-14 10:00:58 +000035#if defined(__native_client__)
36#include "talk/base/nullsocketserver.h"
37typedef talk_base::NullSocketServer DefaultSocketServer;
38#else
henrike@webrtc.org28e20752013-07-10 00:45:36 +000039#include "talk/base/physicalsocketserver.h"
henrika@webrtc.orgaebb1ad2014-01-14 10:00:58 +000040typedef talk_base::PhysicalSocketServer DefaultSocketServer;
41#endif
henrike@webrtc.org28e20752013-07-10 00:45:36 +000042
43namespace talk_base {
44
45const uint32 kMaxMsgLatency = 150; // 150 ms
46
47//------------------------------------------------------------------
48// MessageQueueManager
49
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000050MessageQueueManager* MessageQueueManager::instance_ = NULL;
henrike@webrtc.org28e20752013-07-10 00:45:36 +000051
52MessageQueueManager* MessageQueueManager::Instance() {
53 // Note: This is not thread safe, but it is first called before threads are
54 // spawned.
55 if (!instance_)
56 instance_ = new MessageQueueManager;
57 return instance_;
58}
59
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000060bool MessageQueueManager::IsInitialized() {
61 return instance_ != NULL;
62}
63
henrike@webrtc.org28e20752013-07-10 00:45:36 +000064MessageQueueManager::MessageQueueManager() {
65}
66
67MessageQueueManager::~MessageQueueManager() {
68}
69
70void MessageQueueManager::Add(MessageQueue *message_queue) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000071 return Instance()->AddInternal(message_queue);
72}
73void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +000074 // MessageQueueManager methods should be non-reentrant, so we
75 // ASSERT that is the case. If any of these ASSERT, please
76 // contact bpm or jbeda.
77 ASSERT(!crit_.CurrentThreadIsOwner());
78 CritScope cs(&crit_);
79 message_queues_.push_back(message_queue);
80}
81
82void MessageQueueManager::Remove(MessageQueue *message_queue) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000083 // If there isn't a message queue manager instance, then there isn't a queue
84 // to remove.
85 if (!instance_) return;
86 return Instance()->RemoveInternal(message_queue);
87}
88void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +000089 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
90 // If this is the last MessageQueue, destroy the manager as well so that
91 // we don't leak this object at program shutdown. As mentioned above, this is
92 // not thread-safe, but this should only happen at program termination (when
93 // the ThreadManager is destroyed, and threads are no longer active).
94 bool destroy = false;
95 {
96 CritScope cs(&crit_);
97 std::vector<MessageQueue *>::iterator iter;
98 iter = std::find(message_queues_.begin(), message_queues_.end(),
99 message_queue);
100 if (iter != message_queues_.end()) {
101 message_queues_.erase(iter);
102 }
103 destroy = message_queues_.empty();
104 }
105 if (destroy) {
106 instance_ = NULL;
107 delete this;
108 }
109}
110
111void MessageQueueManager::Clear(MessageHandler *handler) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000112 // If there isn't a message queue manager instance, then there aren't any
113 // queues to remove this handler from.
114 if (!instance_) return;
115 return Instance()->ClearInternal(handler);
116}
117void MessageQueueManager::ClearInternal(MessageHandler *handler) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000118 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
119 CritScope cs(&crit_);
120 std::vector<MessageQueue *>::iterator iter;
121 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
122 (*iter)->Clear(handler);
123}
124
125//------------------------------------------------------------------
126// MessageQueue
127
128MessageQueue::MessageQueue(SocketServer* ss)
129 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
130 dmsgq_next_num_(0) {
131 if (!ss_) {
132 // Currently, MessageQueue holds a socket server, and is the base class for
133 // Thread. It seems like it makes more sense for Thread to hold the socket
134 // server, and provide it to the MessageQueue, since the Thread controls
135 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
136 // messagequeue_unittest to depend on network libraries... yuck.
henrika@webrtc.orgaebb1ad2014-01-14 10:00:58 +0000137 default_ss_.reset(new DefaultSocketServer());
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000138 ss_ = default_ss_.get();
139 }
140 ss_->SetMessageQueue(this);
141}
142
143MessageQueue::~MessageQueue() {
144 // The signal is done from here to ensure
145 // that it always gets called when the queue
146 // is going away.
147 SignalQueueDestroyed();
148 if (active_) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000149 MessageQueueManager::Remove(this);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000150 Clear(NULL);
151 }
152 if (ss_) {
153 ss_->SetMessageQueue(NULL);
154 }
155}
156
157void MessageQueue::set_socketserver(SocketServer* ss) {
158 ss_ = ss ? ss : default_ss_.get();
159 ss_->SetMessageQueue(this);
160}
161
162void MessageQueue::Quit() {
163 fStop_ = true;
164 ss_->WakeUp();
165}
166
167bool MessageQueue::IsQuitting() {
168 return fStop_;
169}
170
171void MessageQueue::Restart() {
172 fStop_ = false;
173}
174
175bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
176 if (fPeekKeep_) {
177 *pmsg = msgPeek_;
178 return true;
179 }
180 if (!Get(pmsg, cmsWait))
181 return false;
182 msgPeek_ = *pmsg;
183 fPeekKeep_ = true;
184 return true;
185}
186
187bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
188 // Return and clear peek if present
189 // Always return the peek if it exists so there is Peek/Get symmetry
190
191 if (fPeekKeep_) {
192 *pmsg = msgPeek_;
193 fPeekKeep_ = false;
194 return true;
195 }
196
197 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
198
199 int cmsTotal = cmsWait;
200 int cmsElapsed = 0;
201 uint32 msStart = Time();
202 uint32 msCurrent = msStart;
203 while (true) {
204 // Check for sent messages
205 ReceiveSends();
206
207 // Check for posted events
208 int cmsDelayNext = kForever;
209 bool first_pass = true;
210 while (true) {
211 // All queue operations need to be locked, but nothing else in this loop
212 // (specifically handling disposed message) can happen inside the crit.
213 // Otherwise, disposed MessageHandlers will cause deadlocks.
214 {
215 CritScope cs(&crit_);
216 // On the first pass, check for delayed messages that have been
217 // triggered and calculate the next trigger time.
218 if (first_pass) {
219 first_pass = false;
220 while (!dmsgq_.empty()) {
221 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
222 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
223 break;
224 }
225 msgq_.push_back(dmsgq_.top().msg_);
226 dmsgq_.pop();
227 }
228 }
229 // Pull a message off the message queue, if available.
230 if (msgq_.empty()) {
231 break;
232 } else {
233 *pmsg = msgq_.front();
234 msgq_.pop_front();
235 }
236 } // crit_ is released here.
237
238 // Log a warning for time-sensitive messages that we're late to deliver.
239 if (pmsg->ts_sensitive) {
240 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
241 if (delay > 0) {
242 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
243 << (delay + kMaxMsgLatency) << "ms";
244 }
245 }
246 // If this was a dispose message, delete it and skip it.
247 if (MQID_DISPOSE == pmsg->message_id) {
248 ASSERT(NULL == pmsg->phandler);
249 delete pmsg->pdata;
250 *pmsg = Message();
251 continue;
252 }
253 return true;
254 }
255
256 if (fStop_)
257 break;
258
259 // Which is shorter, the delay wait or the asked wait?
260
261 int cmsNext;
262 if (cmsWait == kForever) {
263 cmsNext = cmsDelayNext;
264 } else {
265 cmsNext = _max(0, cmsTotal - cmsElapsed);
266 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
267 cmsNext = cmsDelayNext;
268 }
269
270 // Wait and multiplex in the meantime
271 if (!ss_->Wait(cmsNext, process_io))
272 return false;
273
274 // If the specified timeout expired, return
275
276 msCurrent = Time();
277 cmsElapsed = TimeDiff(msCurrent, msStart);
278 if (cmsWait != kForever) {
279 if (cmsElapsed >= cmsWait)
280 return false;
281 }
282 }
283 return false;
284}
285
286void MessageQueue::ReceiveSends() {
287}
288
289void MessageQueue::Post(MessageHandler *phandler, uint32 id,
290 MessageData *pdata, bool time_sensitive) {
291 if (fStop_)
292 return;
293
294 // Keep thread safe
295 // Add the message to the end of the queue
296 // Signal for the multiplexer to return
297
298 CritScope cs(&crit_);
299 EnsureActive();
300 Message msg;
301 msg.phandler = phandler;
302 msg.message_id = id;
303 msg.pdata = pdata;
304 if (time_sensitive) {
305 msg.ts_sensitive = Time() + kMaxMsgLatency;
306 }
307 msgq_.push_back(msg);
308 ss_->WakeUp();
309}
310
311void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
312 MessageHandler *phandler, uint32 id, MessageData* pdata) {
313 if (fStop_)
314 return;
315
316 // Keep thread safe
317 // Add to the priority queue. Gets sorted soonest first.
318 // Signal for the multiplexer to return.
319
320 CritScope cs(&crit_);
321 EnsureActive();
322 Message msg;
323 msg.phandler = phandler;
324 msg.message_id = id;
325 msg.pdata = pdata;
326 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
327 dmsgq_.push(dmsg);
328 // If this message queue processes 1 message every millisecond for 50 days,
329 // we will wrap this number. Even then, only messages with identical times
330 // will be misordered, and then only briefly. This is probably ok.
331 VERIFY(0 != ++dmsgq_next_num_);
332 ss_->WakeUp();
333}
334
335int MessageQueue::GetDelay() {
336 CritScope cs(&crit_);
337
338 if (!msgq_.empty())
339 return 0;
340
341 if (!dmsgq_.empty()) {
342 int delay = TimeUntil(dmsgq_.top().msTrigger_);
343 if (delay < 0)
344 delay = 0;
345 return delay;
346 }
347
348 return kForever;
349}
350
351void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
352 MessageList* removed) {
353 CritScope cs(&crit_);
354
355 // Remove messages with phandler
356
357 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
358 if (removed) {
359 removed->push_back(msgPeek_);
360 } else {
361 delete msgPeek_.pdata;
362 }
363 fPeekKeep_ = false;
364 }
365
366 // Remove from ordered message queue
367
368 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
369 if (it->Match(phandler, id)) {
370 if (removed) {
371 removed->push_back(*it);
372 } else {
373 delete it->pdata;
374 }
375 it = msgq_.erase(it);
376 } else {
377 ++it;
378 }
379 }
380
381 // Remove from priority queue. Not directly iterable, so use this approach
382
383 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
384 for (PriorityQueue::container_type::iterator it = new_end;
385 it != dmsgq_.container().end(); ++it) {
386 if (it->msg_.Match(phandler, id)) {
387 if (removed) {
388 removed->push_back(it->msg_);
389 } else {
390 delete it->msg_.pdata;
391 }
392 } else {
393 *new_end++ = *it;
394 }
395 }
396 dmsgq_.container().erase(new_end, dmsgq_.container().end());
397 dmsgq_.reheap();
398}
399
400void MessageQueue::Dispatch(Message *pmsg) {
401 pmsg->phandler->OnMessage(pmsg);
402}
403
404void MessageQueue::EnsureActive() {
405 ASSERT(crit_.CurrentThreadIsOwner());
406 if (!active_) {
407 active_ = true;
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000408 MessageQueueManager::Add(this);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000409 }
410}
411
412} // namespace talk_base