blob: 5c40622162262c4c596841870d23097509cf3cf3 [file] [log] [blame]
henrike@webrtc.org28e20752013-07-10 00:45:36 +00001/*
2 * libjingle
3 * Copyright 2004--2005, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#ifdef POSIX
29#include <sys/time.h>
30#endif
31
32#include "talk/base/common.h"
33#include "talk/base/logging.h"
34#include "talk/base/messagequeue.h"
35#include "talk/base/physicalsocketserver.h"
36
37
38namespace talk_base {
39
40const uint32 kMaxMsgLatency = 150; // 150 ms
41
42//------------------------------------------------------------------
43// MessageQueueManager
44
45MessageQueueManager* MessageQueueManager::instance_;
46
47MessageQueueManager* MessageQueueManager::Instance() {
48 // Note: This is not thread safe, but it is first called before threads are
49 // spawned.
50 if (!instance_)
51 instance_ = new MessageQueueManager;
52 return instance_;
53}
54
55MessageQueueManager::MessageQueueManager() {
56}
57
58MessageQueueManager::~MessageQueueManager() {
59}
60
61void MessageQueueManager::Add(MessageQueue *message_queue) {
62 // MessageQueueManager methods should be non-reentrant, so we
63 // ASSERT that is the case. If any of these ASSERT, please
64 // contact bpm or jbeda.
65 ASSERT(!crit_.CurrentThreadIsOwner());
66 CritScope cs(&crit_);
67 message_queues_.push_back(message_queue);
68}
69
70void MessageQueueManager::Remove(MessageQueue *message_queue) {
71 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
72 // If this is the last MessageQueue, destroy the manager as well so that
73 // we don't leak this object at program shutdown. As mentioned above, this is
74 // not thread-safe, but this should only happen at program termination (when
75 // the ThreadManager is destroyed, and threads are no longer active).
76 bool destroy = false;
77 {
78 CritScope cs(&crit_);
79 std::vector<MessageQueue *>::iterator iter;
80 iter = std::find(message_queues_.begin(), message_queues_.end(),
81 message_queue);
82 if (iter != message_queues_.end()) {
83 message_queues_.erase(iter);
84 }
85 destroy = message_queues_.empty();
86 }
87 if (destroy) {
88 instance_ = NULL;
89 delete this;
90 }
91}
92
93void MessageQueueManager::Clear(MessageHandler *handler) {
94 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
95 CritScope cs(&crit_);
96 std::vector<MessageQueue *>::iterator iter;
97 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
98 (*iter)->Clear(handler);
99}
100
101//------------------------------------------------------------------
102// MessageQueue
103
104MessageQueue::MessageQueue(SocketServer* ss)
105 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
106 dmsgq_next_num_(0) {
107 if (!ss_) {
108 // Currently, MessageQueue holds a socket server, and is the base class for
109 // Thread. It seems like it makes more sense for Thread to hold the socket
110 // server, and provide it to the MessageQueue, since the Thread controls
111 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
112 // messagequeue_unittest to depend on network libraries... yuck.
113 default_ss_.reset(new PhysicalSocketServer());
114 ss_ = default_ss_.get();
115 }
116 ss_->SetMessageQueue(this);
117}
118
119MessageQueue::~MessageQueue() {
120 // The signal is done from here to ensure
121 // that it always gets called when the queue
122 // is going away.
123 SignalQueueDestroyed();
124 if (active_) {
125 MessageQueueManager::Instance()->Remove(this);
126 Clear(NULL);
127 }
128 if (ss_) {
129 ss_->SetMessageQueue(NULL);
130 }
131}
132
133void MessageQueue::set_socketserver(SocketServer* ss) {
134 ss_ = ss ? ss : default_ss_.get();
135 ss_->SetMessageQueue(this);
136}
137
138void MessageQueue::Quit() {
139 fStop_ = true;
140 ss_->WakeUp();
141}
142
143bool MessageQueue::IsQuitting() {
144 return fStop_;
145}
146
147void MessageQueue::Restart() {
148 fStop_ = false;
149}
150
151bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
152 if (fPeekKeep_) {
153 *pmsg = msgPeek_;
154 return true;
155 }
156 if (!Get(pmsg, cmsWait))
157 return false;
158 msgPeek_ = *pmsg;
159 fPeekKeep_ = true;
160 return true;
161}
162
163bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
164 // Return and clear peek if present
165 // Always return the peek if it exists so there is Peek/Get symmetry
166
167 if (fPeekKeep_) {
168 *pmsg = msgPeek_;
169 fPeekKeep_ = false;
170 return true;
171 }
172
173 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
174
175 int cmsTotal = cmsWait;
176 int cmsElapsed = 0;
177 uint32 msStart = Time();
178 uint32 msCurrent = msStart;
179 while (true) {
180 // Check for sent messages
181 ReceiveSends();
182
183 // Check for posted events
184 int cmsDelayNext = kForever;
185 bool first_pass = true;
186 while (true) {
187 // All queue operations need to be locked, but nothing else in this loop
188 // (specifically handling disposed message) can happen inside the crit.
189 // Otherwise, disposed MessageHandlers will cause deadlocks.
190 {
191 CritScope cs(&crit_);
192 // On the first pass, check for delayed messages that have been
193 // triggered and calculate the next trigger time.
194 if (first_pass) {
195 first_pass = false;
196 while (!dmsgq_.empty()) {
197 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
198 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
199 break;
200 }
201 msgq_.push_back(dmsgq_.top().msg_);
202 dmsgq_.pop();
203 }
204 }
205 // Pull a message off the message queue, if available.
206 if (msgq_.empty()) {
207 break;
208 } else {
209 *pmsg = msgq_.front();
210 msgq_.pop_front();
211 }
212 } // crit_ is released here.
213
214 // Log a warning for time-sensitive messages that we're late to deliver.
215 if (pmsg->ts_sensitive) {
216 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
217 if (delay > 0) {
218 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
219 << (delay + kMaxMsgLatency) << "ms";
220 }
221 }
222 // If this was a dispose message, delete it and skip it.
223 if (MQID_DISPOSE == pmsg->message_id) {
224 ASSERT(NULL == pmsg->phandler);
225 delete pmsg->pdata;
226 *pmsg = Message();
227 continue;
228 }
229 return true;
230 }
231
232 if (fStop_)
233 break;
234
235 // Which is shorter, the delay wait or the asked wait?
236
237 int cmsNext;
238 if (cmsWait == kForever) {
239 cmsNext = cmsDelayNext;
240 } else {
241 cmsNext = _max(0, cmsTotal - cmsElapsed);
242 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
243 cmsNext = cmsDelayNext;
244 }
245
246 // Wait and multiplex in the meantime
247 if (!ss_->Wait(cmsNext, process_io))
248 return false;
249
250 // If the specified timeout expired, return
251
252 msCurrent = Time();
253 cmsElapsed = TimeDiff(msCurrent, msStart);
254 if (cmsWait != kForever) {
255 if (cmsElapsed >= cmsWait)
256 return false;
257 }
258 }
259 return false;
260}
261
262void MessageQueue::ReceiveSends() {
263}
264
265void MessageQueue::Post(MessageHandler *phandler, uint32 id,
266 MessageData *pdata, bool time_sensitive) {
267 if (fStop_)
268 return;
269
270 // Keep thread safe
271 // Add the message to the end of the queue
272 // Signal for the multiplexer to return
273
274 CritScope cs(&crit_);
275 EnsureActive();
276 Message msg;
277 msg.phandler = phandler;
278 msg.message_id = id;
279 msg.pdata = pdata;
280 if (time_sensitive) {
281 msg.ts_sensitive = Time() + kMaxMsgLatency;
282 }
283 msgq_.push_back(msg);
284 ss_->WakeUp();
285}
286
287void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
288 MessageHandler *phandler, uint32 id, MessageData* pdata) {
289 if (fStop_)
290 return;
291
292 // Keep thread safe
293 // Add to the priority queue. Gets sorted soonest first.
294 // Signal for the multiplexer to return.
295
296 CritScope cs(&crit_);
297 EnsureActive();
298 Message msg;
299 msg.phandler = phandler;
300 msg.message_id = id;
301 msg.pdata = pdata;
302 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
303 dmsgq_.push(dmsg);
304 // If this message queue processes 1 message every millisecond for 50 days,
305 // we will wrap this number. Even then, only messages with identical times
306 // will be misordered, and then only briefly. This is probably ok.
307 VERIFY(0 != ++dmsgq_next_num_);
308 ss_->WakeUp();
309}
310
311int MessageQueue::GetDelay() {
312 CritScope cs(&crit_);
313
314 if (!msgq_.empty())
315 return 0;
316
317 if (!dmsgq_.empty()) {
318 int delay = TimeUntil(dmsgq_.top().msTrigger_);
319 if (delay < 0)
320 delay = 0;
321 return delay;
322 }
323
324 return kForever;
325}
326
327void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
328 MessageList* removed) {
329 CritScope cs(&crit_);
330
331 // Remove messages with phandler
332
333 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
334 if (removed) {
335 removed->push_back(msgPeek_);
336 } else {
337 delete msgPeek_.pdata;
338 }
339 fPeekKeep_ = false;
340 }
341
342 // Remove from ordered message queue
343
344 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
345 if (it->Match(phandler, id)) {
346 if (removed) {
347 removed->push_back(*it);
348 } else {
349 delete it->pdata;
350 }
351 it = msgq_.erase(it);
352 } else {
353 ++it;
354 }
355 }
356
357 // Remove from priority queue. Not directly iterable, so use this approach
358
359 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
360 for (PriorityQueue::container_type::iterator it = new_end;
361 it != dmsgq_.container().end(); ++it) {
362 if (it->msg_.Match(phandler, id)) {
363 if (removed) {
364 removed->push_back(it->msg_);
365 } else {
366 delete it->msg_.pdata;
367 }
368 } else {
369 *new_end++ = *it;
370 }
371 }
372 dmsgq_.container().erase(new_end, dmsgq_.container().end());
373 dmsgq_.reheap();
374}
375
376void MessageQueue::Dispatch(Message *pmsg) {
377 pmsg->phandler->OnMessage(pmsg);
378}
379
380void MessageQueue::EnsureActive() {
381 ASSERT(crit_.CurrentThreadIsOwner());
382 if (!active_) {
383 active_ = true;
384 MessageQueueManager::Instance()->Add(this);
385 }
386}
387
388} // namespace talk_base