blob: 15b700ffe482afba999907c97da8addcd101ec68 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifdef POSIX
29#include <sys/time.h>
30#endif
31
32#include "talk/base/common.h"
33#include "talk/base/logging.h"
34#include "talk/base/messagequeue.h"
35#include "talk/base/physicalsocketserver.h"
36
37
38namespace talk_base {
39
40const uint32 kMaxMsgLatency = 150; // 150 ms
41
42//------------------------------------------------------------------
43// MessageQueueManager
44
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000045MessageQueueManager* MessageQueueManager::instance_ = NULL;
henrike@webrtc.org28e20752013-07-10 00:45:36 +000046
47MessageQueueManager* MessageQueueManager::Instance() {
48 // Note: This is not thread safe, but it is first called before threads are
49 // spawned.
50 if (!instance_)
51 instance_ = new MessageQueueManager;
52 return instance_;
53}
54
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000055bool MessageQueueManager::IsInitialized() {
56 return instance_ != NULL;
57}
58
henrike@webrtc.org28e20752013-07-10 00:45:36 +000059MessageQueueManager::MessageQueueManager() {
60}
61
62MessageQueueManager::~MessageQueueManager() {
63}
64
65void MessageQueueManager::Add(MessageQueue *message_queue) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000066 return Instance()->AddInternal(message_queue);
67}
68void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +000069 // MessageQueueManager methods should be non-reentrant, so we
70 // ASSERT that is the case. If any of these ASSERT, please
71 // contact bpm or jbeda.
72 ASSERT(!crit_.CurrentThreadIsOwner());
73 CritScope cs(&crit_);
74 message_queues_.push_back(message_queue);
75}
76
77void MessageQueueManager::Remove(MessageQueue *message_queue) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +000078 // If there isn't a message queue manager instance, then there isn't a queue
79 // to remove.
80 if (!instance_) return;
81 return Instance()->RemoveInternal(message_queue);
82}
83void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +000084 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
85 // If this is the last MessageQueue, destroy the manager as well so that
86 // we don't leak this object at program shutdown. As mentioned above, this is
87 // not thread-safe, but this should only happen at program termination (when
88 // the ThreadManager is destroyed, and threads are no longer active).
89 bool destroy = false;
90 {
91 CritScope cs(&crit_);
92 std::vector<MessageQueue *>::iterator iter;
93 iter = std::find(message_queues_.begin(), message_queues_.end(),
94 message_queue);
95 if (iter != message_queues_.end()) {
96 message_queues_.erase(iter);
97 }
98 destroy = message_queues_.empty();
99 }
100 if (destroy) {
101 instance_ = NULL;
102 delete this;
103 }
104}
105
106void MessageQueueManager::Clear(MessageHandler *handler) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000107 // If there isn't a message queue manager instance, then there aren't any
108 // queues to remove this handler from.
109 if (!instance_) return;
110 return Instance()->ClearInternal(handler);
111}
112void MessageQueueManager::ClearInternal(MessageHandler *handler) {
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000113 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
114 CritScope cs(&crit_);
115 std::vector<MessageQueue *>::iterator iter;
116 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
117 (*iter)->Clear(handler);
118}
119
120//------------------------------------------------------------------
121// MessageQueue
122
123MessageQueue::MessageQueue(SocketServer* ss)
124 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
125 dmsgq_next_num_(0) {
126 if (!ss_) {
127 // Currently, MessageQueue holds a socket server, and is the base class for
128 // Thread. It seems like it makes more sense for Thread to hold the socket
129 // server, and provide it to the MessageQueue, since the Thread controls
130 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
131 // messagequeue_unittest to depend on network libraries... yuck.
132 default_ss_.reset(new PhysicalSocketServer());
133 ss_ = default_ss_.get();
134 }
135 ss_->SetMessageQueue(this);
136}
137
138MessageQueue::~MessageQueue() {
139 // The signal is done from here to ensure
140 // that it always gets called when the queue
141 // is going away.
142 SignalQueueDestroyed();
143 if (active_) {
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000144 MessageQueueManager::Remove(this);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000145 Clear(NULL);
146 }
147 if (ss_) {
148 ss_->SetMessageQueue(NULL);
149 }
150}
151
152void MessageQueue::set_socketserver(SocketServer* ss) {
153 ss_ = ss ? ss : default_ss_.get();
154 ss_->SetMessageQueue(this);
155}
156
157void MessageQueue::Quit() {
158 fStop_ = true;
159 ss_->WakeUp();
160}
161
162bool MessageQueue::IsQuitting() {
163 return fStop_;
164}
165
166void MessageQueue::Restart() {
167 fStop_ = false;
168}
169
170bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
171 if (fPeekKeep_) {
172 *pmsg = msgPeek_;
173 return true;
174 }
175 if (!Get(pmsg, cmsWait))
176 return false;
177 msgPeek_ = *pmsg;
178 fPeekKeep_ = true;
179 return true;
180}
181
182bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
183 // Return and clear peek if present
184 // Always return the peek if it exists so there is Peek/Get symmetry
185
186 if (fPeekKeep_) {
187 *pmsg = msgPeek_;
188 fPeekKeep_ = false;
189 return true;
190 }
191
192 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
193
194 int cmsTotal = cmsWait;
195 int cmsElapsed = 0;
196 uint32 msStart = Time();
197 uint32 msCurrent = msStart;
198 while (true) {
199 // Check for sent messages
200 ReceiveSends();
201
202 // Check for posted events
203 int cmsDelayNext = kForever;
204 bool first_pass = true;
205 while (true) {
206 // All queue operations need to be locked, but nothing else in this loop
207 // (specifically handling disposed message) can happen inside the crit.
208 // Otherwise, disposed MessageHandlers will cause deadlocks.
209 {
210 CritScope cs(&crit_);
211 // On the first pass, check for delayed messages that have been
212 // triggered and calculate the next trigger time.
213 if (first_pass) {
214 first_pass = false;
215 while (!dmsgq_.empty()) {
216 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
217 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
218 break;
219 }
220 msgq_.push_back(dmsgq_.top().msg_);
221 dmsgq_.pop();
222 }
223 }
224 // Pull a message off the message queue, if available.
225 if (msgq_.empty()) {
226 break;
227 } else {
228 *pmsg = msgq_.front();
229 msgq_.pop_front();
230 }
231 } // crit_ is released here.
232
233 // Log a warning for time-sensitive messages that we're late to deliver.
234 if (pmsg->ts_sensitive) {
235 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
236 if (delay > 0) {
237 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
238 << (delay + kMaxMsgLatency) << "ms";
239 }
240 }
241 // If this was a dispose message, delete it and skip it.
242 if (MQID_DISPOSE == pmsg->message_id) {
243 ASSERT(NULL == pmsg->phandler);
244 delete pmsg->pdata;
245 *pmsg = Message();
246 continue;
247 }
248 return true;
249 }
250
251 if (fStop_)
252 break;
253
254 // Which is shorter, the delay wait or the asked wait?
255
256 int cmsNext;
257 if (cmsWait == kForever) {
258 cmsNext = cmsDelayNext;
259 } else {
260 cmsNext = _max(0, cmsTotal - cmsElapsed);
261 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
262 cmsNext = cmsDelayNext;
263 }
264
265 // Wait and multiplex in the meantime
266 if (!ss_->Wait(cmsNext, process_io))
267 return false;
268
269 // If the specified timeout expired, return
270
271 msCurrent = Time();
272 cmsElapsed = TimeDiff(msCurrent, msStart);
273 if (cmsWait != kForever) {
274 if (cmsElapsed >= cmsWait)
275 return false;
276 }
277 }
278 return false;
279}
280
281void MessageQueue::ReceiveSends() {
282}
283
284void MessageQueue::Post(MessageHandler *phandler, uint32 id,
285 MessageData *pdata, bool time_sensitive) {
286 if (fStop_)
287 return;
288
289 // Keep thread safe
290 // Add the message to the end of the queue
291 // Signal for the multiplexer to return
292
293 CritScope cs(&crit_);
294 EnsureActive();
295 Message msg;
296 msg.phandler = phandler;
297 msg.message_id = id;
298 msg.pdata = pdata;
299 if (time_sensitive) {
300 msg.ts_sensitive = Time() + kMaxMsgLatency;
301 }
302 msgq_.push_back(msg);
303 ss_->WakeUp();
304}
305
306void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
307 MessageHandler *phandler, uint32 id, MessageData* pdata) {
308 if (fStop_)
309 return;
310
311 // Keep thread safe
312 // Add to the priority queue. Gets sorted soonest first.
313 // Signal for the multiplexer to return.
314
315 CritScope cs(&crit_);
316 EnsureActive();
317 Message msg;
318 msg.phandler = phandler;
319 msg.message_id = id;
320 msg.pdata = pdata;
321 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
322 dmsgq_.push(dmsg);
323 // If this message queue processes 1 message every millisecond for 50 days,
324 // we will wrap this number. Even then, only messages with identical times
325 // will be misordered, and then only briefly. This is probably ok.
326 VERIFY(0 != ++dmsgq_next_num_);
327 ss_->WakeUp();
328}
329
330int MessageQueue::GetDelay() {
331 CritScope cs(&crit_);
332
333 if (!msgq_.empty())
334 return 0;
335
336 if (!dmsgq_.empty()) {
337 int delay = TimeUntil(dmsgq_.top().msTrigger_);
338 if (delay < 0)
339 delay = 0;
340 return delay;
341 }
342
343 return kForever;
344}
345
346void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
347 MessageList* removed) {
348 CritScope cs(&crit_);
349
350 // Remove messages with phandler
351
352 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
353 if (removed) {
354 removed->push_back(msgPeek_);
355 } else {
356 delete msgPeek_.pdata;
357 }
358 fPeekKeep_ = false;
359 }
360
361 // Remove from ordered message queue
362
363 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
364 if (it->Match(phandler, id)) {
365 if (removed) {
366 removed->push_back(*it);
367 } else {
368 delete it->pdata;
369 }
370 it = msgq_.erase(it);
371 } else {
372 ++it;
373 }
374 }
375
376 // Remove from priority queue. Not directly iterable, so use this approach
377
378 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
379 for (PriorityQueue::container_type::iterator it = new_end;
380 it != dmsgq_.container().end(); ++it) {
381 if (it->msg_.Match(phandler, id)) {
382 if (removed) {
383 removed->push_back(it->msg_);
384 } else {
385 delete it->msg_.pdata;
386 }
387 } else {
388 *new_end++ = *it;
389 }
390 }
391 dmsgq_.container().erase(new_end, dmsgq_.container().end());
392 dmsgq_.reheap();
393}
394
395void MessageQueue::Dispatch(Message *pmsg) {
396 pmsg->phandler->OnMessage(pmsg);
397}
398
399void MessageQueue::EnsureActive() {
400 ASSERT(crit_.CurrentThreadIsOwner());
401 if (!active_) {
402 active_ = true;
sergeyu@chromium.org0be6aa02013-08-23 23:21:25 +0000403 MessageQueueManager::Add(this);
henrike@webrtc.org28e20752013-07-10 00:45:36 +0000404 }
405}
406
407} // namespace talk_base