blob: c8ef7211e895f8cffc47db8619ee1f35114349c2 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
tommif9d91542017-02-17 02:47:11 -080013#include <mmsystem.h>
tommic06b1332016-05-14 11:31:40 -070014#include <string.h>
tommic06b1332016-05-14 11:31:40 -070015
tommif9d91542017-02-17 02:47:11 -080016#include <algorithm>
17
18#include "webrtc/base/arraysize.h"
tommic06b1332016-05-14 11:31:40 -070019#include "webrtc/base/checks.h"
20#include "webrtc/base/logging.h"
21
22namespace rtc {
23namespace {
24#define WM_RUN_TASK WM_USER + 1
25#define WM_QUEUE_DELAYED_TASK WM_USER + 2
26
27DWORD g_queue_ptr_tls = 0;
28
29BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
30 g_queue_ptr_tls = TlsAlloc();
31 return TRUE;
32}
33
34DWORD GetQueuePtrTls() {
35 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
tommif9d91542017-02-17 02:47:11 -080036 ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
tommic06b1332016-05-14 11:31:40 -070037 return g_queue_ptr_tls;
38}
39
40struct ThreadStartupData {
41 Event* started;
42 void* thread_context;
43};
44
45void CALLBACK InitializeQueueThread(ULONG_PTR param) {
46 MSG msg;
tommif9d91542017-02-17 02:47:11 -080047 ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
tommic06b1332016-05-14 11:31:40 -070048 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
tommif9d91542017-02-17 02:47:11 -080049 ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
tommic06b1332016-05-14 11:31:40 -070050 data->started->Set();
51}
52} // namespace
53
tommif9d91542017-02-17 02:47:11 -080054class TaskQueue::MultimediaTimer {
55 public:
56 // kMaxTimers defines the limit of how many MultimediaTimer instances should
57 // be created.
58 // Background: The maximum number of supported handles for Wait functions, is
59 // MAXIMUM_WAIT_OBJECTS - 1 (63).
60 // There are some ways to work around the limitation but as it turns out, the
61 // limit of concurrently active multimedia timers per process, is much lower,
62 // or 16. So there isn't much value in going to the lenghts required to
63 // overcome the Wait limitations.
64 // kMaxTimers is larger than 16 though since it is possible that 'complete' or
65 // signaled timers that haven't been handled, are counted as part of
66 // kMaxTimers and thus a multimedia timer can actually be queued even though
67 // as far as we're concerned, there are more than 16 that are pending.
68 static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1;
69
70 // Controls how many MultimediaTimer instances a queue can hold before
71 // attempting to garbage collect (GC) timers that aren't in use.
72 static const int kInstanceThresholdGC = 8;
73
74 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
75
76 MultimediaTimer(MultimediaTimer&& timer)
77 : event_(timer.event_),
78 timer_id_(timer.timer_id_),
79 task_(std::move(timer.task_)) {
80 RTC_DCHECK(event_);
81 timer.event_ = nullptr;
82 timer.timer_id_ = 0;
83 }
84
85 ~MultimediaTimer() { Close(); }
86
87 // Implementing this operator is required because of the way
88 // some stl algorithms work, such as std::rotate().
89 MultimediaTimer& operator=(MultimediaTimer&& timer) {
90 if (this != &timer) {
91 Close();
92 event_ = timer.event_;
93 timer.event_ = nullptr;
94 task_ = std::move(timer.task_);
95 timer_id_ = timer.timer_id_;
96 timer.timer_id_ = 0;
97 }
98 return *this;
99 }
100
101 bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) {
102 RTC_DCHECK_EQ(0, timer_id_);
103 RTC_DCHECK(event_ != nullptr);
104 RTC_DCHECK(!task_.get());
105 RTC_DCHECK(task.get());
106 task_ = std::move(task);
107 timer_id_ =
108 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
109 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
110 return timer_id_ != 0;
111 }
112
113 std::unique_ptr<QueuedTask> Cancel() {
114 if (timer_id_) {
115 ::timeKillEvent(timer_id_);
116 timer_id_ = 0;
117 }
118 return std::move(task_);
119 }
120
121 void OnEventSignaled() {
122 RTC_DCHECK_NE(0, timer_id_);
123 timer_id_ = 0;
124 task_->Run() ? task_.reset() : static_cast<void>(task_.release());
125 }
126
127 HANDLE event() const { return event_; }
128
129 bool is_active() const { return timer_id_ != 0; }
130
131 private:
132 void Close() {
133 Cancel();
134
135 if (event_) {
136 ::CloseHandle(event_);
137 event_ = nullptr;
138 }
139 }
140
141 HANDLE event_ = nullptr;
142 MMRESULT timer_id_ = 0;
143 std::unique_ptr<QueuedTask> task_;
144
145 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
146};
147
tommic06b1332016-05-14 11:31:40 -0700148TaskQueue::TaskQueue(const char* queue_name)
149 : thread_(&TaskQueue::ThreadMain, this, queue_name) {
150 RTC_DCHECK(queue_name);
151 thread_.Start();
152 Event event(false, false);
153 ThreadStartupData startup = {&event, this};
154 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
155 reinterpret_cast<ULONG_PTR>(&startup)));
156 event.Wait(Event::kForever);
157}
158
159TaskQueue::~TaskQueue() {
160 RTC_DCHECK(!IsCurrent());
tommif9d91542017-02-17 02:47:11 -0800161 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
kwiberg352444f2016-11-28 15:58:53 -0800162 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
tommic06b1332016-05-14 11:31:40 -0700163 Sleep(1);
164 }
165 thread_.Stop();
166}
167
168// static
169TaskQueue* TaskQueue::Current() {
tommif9d91542017-02-17 02:47:11 -0800170 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls()));
tommic06b1332016-05-14 11:31:40 -0700171}
172
173// static
174bool TaskQueue::IsCurrent(const char* queue_name) {
175 TaskQueue* current = Current();
176 return current && current->thread_.name().compare(queue_name) == 0;
177}
178
179bool TaskQueue::IsCurrent() const {
180 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
181}
182
183void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
tommif9d91542017-02-17 02:47:11 -0800184 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
185 reinterpret_cast<LPARAM>(task.get()))) {
tommic06b1332016-05-14 11:31:40 -0700186 task.release();
187 }
188}
189
190void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
191 uint32_t milliseconds) {
192 WPARAM wparam;
193#if defined(_WIN64)
194 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
195 // so this compensation isn't that accurate, but since we have unused 32 bits
196 // on Win64, we might as well use them.
197 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
198#else
199 wparam = milliseconds;
200#endif
tommif9d91542017-02-17 02:47:11 -0800201 if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
202 reinterpret_cast<LPARAM>(task.get()))) {
tommic06b1332016-05-14 11:31:40 -0700203 task.release();
204 }
205}
206
207void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
208 std::unique_ptr<QueuedTask> reply,
209 TaskQueue* reply_queue) {
210 QueuedTask* task_ptr = task.release();
211 QueuedTask* reply_task_ptr = reply.release();
212 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
213 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
214 if (task_ptr->Run())
215 delete task_ptr;
216 // If the thread's message queue is full, we can't queue the task and will
217 // have to drop it (i.e. delete).
tommif9d91542017-02-17 02:47:11 -0800218 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
219 reinterpret_cast<LPARAM>(reply_task_ptr))) {
tommic06b1332016-05-14 11:31:40 -0700220 delete reply_task_ptr;
221 }
222 });
223}
224
225void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
226 std::unique_ptr<QueuedTask> reply) {
227 return PostTaskAndReply(std::move(task), std::move(reply), Current());
228}
229
230// static
tommi0f8b4032017-02-22 11:22:05 -0800231void TaskQueue::ThreadMain(void* context) {
tommif9d91542017-02-17 02:47:11 -0800232 HANDLE timer_handles[MultimediaTimer::kMaxTimers];
233 // Active multimedia timers.
234 std::vector<MultimediaTimer> mm_timers;
235 // Tasks that have been queued by using SetTimer/WM_TIMER.
tommib89257a2016-07-12 01:24:36 -0700236 DelayedTasks delayed_tasks;
tommif9d91542017-02-17 02:47:11 -0800237
tommib89257a2016-07-12 01:24:36 -0700238 while (true) {
tommif9d91542017-02-17 02:47:11 -0800239 RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles));
240 DWORD count = 0;
241 for (const auto& t : mm_timers) {
242 if (!t.is_active())
243 break;
244 timer_handles[count++] = t.event();
245 }
246 // Make sure we do an alertable wait as that's required to allow APCs to run
247 // (e.g. required for InitializeQueueThread and stopping the thread in
248 // PlatformThread).
249 DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE,
tommib89257a2016-07-12 01:24:36 -0700250 QS_ALLEVENTS, MWMO_ALERTABLE);
251 RTC_CHECK_NE(WAIT_FAILED, result);
tommif9d91542017-02-17 02:47:11 -0800252 // If we're not waiting for any timers, then count will be equal to
253 // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents
254 // "One more than the number of timers", which means that there's a
255 // message in the queue that needs to be handled.
256 // If |result| is less than |count|, then its value will be the index of the
257 // timer that has been signaled.
258 if (result == (WAIT_OBJECT_0 + count)) {
259 if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers))
tommib89257a2016-07-12 01:24:36 -0700260 break;
tommif9d91542017-02-17 02:47:11 -0800261 } else if (result < (WAIT_OBJECT_0 + count)) {
262 mm_timers[result].OnEventSignaled();
263 RTC_DCHECK(!mm_timers[result].is_active());
264 // Reuse timer events by moving inactive timers to the back of the vector.
265 // When new delayed tasks are queued, they'll get reused.
266 if (mm_timers.size() > 1) {
267 auto it = mm_timers.begin() + result;
268 std::rotate(it, it + 1, mm_timers.end());
269 }
270
271 // Collect some garbage.
272 if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) {
273 const auto inactive = std::find_if(
274 mm_timers.begin(), mm_timers.end(),
275 [](const MultimediaTimer& t) { return !t.is_active(); });
276 if (inactive != mm_timers.end()) {
277 // Since inactive timers are always moved to the back, we can
278 // safely delete all timers following the first inactive one.
279 mm_timers.erase(inactive, mm_timers.end());
280 }
281 }
tommib89257a2016-07-12 01:24:36 -0700282 } else {
283 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
284 }
285 }
tommib89257a2016-07-12 01:24:36 -0700286}
tommic06b1332016-05-14 11:31:40 -0700287
tommib89257a2016-07-12 01:24:36 -0700288// static
tommif9d91542017-02-17 02:47:11 -0800289bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
290 std::vector<MultimediaTimer>* timers) {
tommib89257a2016-07-12 01:24:36 -0700291 MSG msg = {};
tommif9d91542017-02-17 02:47:11 -0800292 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
tommib89257a2016-07-12 01:24:36 -0700293 msg.message != WM_QUIT) {
tommic06b1332016-05-14 11:31:40 -0700294 if (!msg.hwnd) {
295 switch (msg.message) {
296 case WM_RUN_TASK: {
297 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
298 if (task->Run())
299 delete task;
300 break;
301 }
302 case WM_QUEUE_DELAYED_TASK: {
tommif9d91542017-02-17 02:47:11 -0800303 std::unique_ptr<QueuedTask> task(
304 reinterpret_cast<QueuedTask*>(msg.lParam));
tommic06b1332016-05-14 11:31:40 -0700305 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
306#if defined(_WIN64)
307 // Subtract the time it took to queue the timer.
308 const DWORD now = GetTickCount();
309 DWORD post_time = now - (msg.wParam >> 32);
310 milliseconds =
311 post_time > milliseconds ? 0 : milliseconds - post_time;
312#endif
tommif9d91542017-02-17 02:47:11 -0800313 bool timer_queued = false;
314 if (timers->size() < MultimediaTimer::kMaxTimers) {
315 MultimediaTimer* timer = nullptr;
316 auto available = std::find_if(
317 timers->begin(), timers->end(),
318 [](const MultimediaTimer& t) { return !t.is_active(); });
319 if (available != timers->end()) {
320 timer = &(*available);
321 } else {
322 timers->emplace_back();
323 timer = &timers->back();
324 }
325
326 timer_queued =
327 timer->StartOneShotTimer(std::move(task), milliseconds);
328 if (!timer_queued) {
329 // No more multimedia timers can be queued.
330 // Detach the task and fall back on SetTimer.
331 task = timer->Cancel();
332 }
333 }
334
335 // When we fail to use multimedia timers, we fall back on the more
336 // coarse SetTimer/WM_TIMER approach.
337 if (!timer_queued) {
338 UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr);
339 delayed_tasks->insert(std::make_pair(timer_id, task.release()));
340 }
tommic06b1332016-05-14 11:31:40 -0700341 break;
342 }
343 case WM_TIMER: {
tommif9d91542017-02-17 02:47:11 -0800344 ::KillTimer(nullptr, msg.wParam);
tommib89257a2016-07-12 01:24:36 -0700345 auto found = delayed_tasks->find(msg.wParam);
346 RTC_DCHECK(found != delayed_tasks->end());
tommic06b1332016-05-14 11:31:40 -0700347 if (!found->second->Run())
348 found->second.release();
tommib89257a2016-07-12 01:24:36 -0700349 delayed_tasks->erase(found);
tommic06b1332016-05-14 11:31:40 -0700350 break;
351 }
352 default:
353 RTC_NOTREACHED();
354 break;
355 }
356 } else {
tommif9d91542017-02-17 02:47:11 -0800357 ::TranslateMessage(&msg);
358 ::DispatchMessage(&msg);
tommic06b1332016-05-14 11:31:40 -0700359 }
360 }
tommib89257a2016-07-12 01:24:36 -0700361 return msg.message != WM_QUIT;
tommic06b1332016-05-14 11:31:40 -0700362}
tommib89257a2016-07-12 01:24:36 -0700363
tommic06b1332016-05-14 11:31:40 -0700364} // namespace rtc