blob: bbaf7b9f44b4383feff79fd08697fea4e08c7e6b [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
tommif9d91542017-02-17 02:47:11 -080013#include <mmsystem.h>
tommic06b1332016-05-14 11:31:40 -070014#include <string.h>
tommic06b1332016-05-14 11:31:40 -070015
tommif9d91542017-02-17 02:47:11 -080016#include <algorithm>
17
18#include "webrtc/base/arraysize.h"
tommic06b1332016-05-14 11:31:40 -070019#include "webrtc/base/checks.h"
20#include "webrtc/base/logging.h"
21
22namespace rtc {
23namespace {
24#define WM_RUN_TASK WM_USER + 1
25#define WM_QUEUE_DELAYED_TASK WM_USER + 2
26
tommic9bb7912017-02-24 10:42:14 -080027using Priority = TaskQueue::Priority;
28
tommic06b1332016-05-14 11:31:40 -070029DWORD g_queue_ptr_tls = 0;
30
31BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
32 g_queue_ptr_tls = TlsAlloc();
33 return TRUE;
34}
35
36DWORD GetQueuePtrTls() {
37 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
tommif9d91542017-02-17 02:47:11 -080038 ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
tommic06b1332016-05-14 11:31:40 -070039 return g_queue_ptr_tls;
40}
41
42struct ThreadStartupData {
43 Event* started;
44 void* thread_context;
45};
46
47void CALLBACK InitializeQueueThread(ULONG_PTR param) {
48 MSG msg;
tommif9d91542017-02-17 02:47:11 -080049 ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
tommic06b1332016-05-14 11:31:40 -070050 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
tommif9d91542017-02-17 02:47:11 -080051 ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
tommic06b1332016-05-14 11:31:40 -070052 data->started->Set();
53}
tommic9bb7912017-02-24 10:42:14 -080054
55ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
56 switch (priority) {
57 case Priority::HIGH:
58 return kRealtimePriority;
59 case Priority::LOW:
60 return kLowPriority;
61 case Priority::NORMAL:
62 return kNormalPriority;
63 default:
64 RTC_NOTREACHED();
65 break;
66 }
67 return kNormalPriority;
68}
tommi5bdee472017-03-03 05:20:12 -080069
70#if defined(_WIN64)
71DWORD GetTick() {
72 static const UINT kPeriod = 1;
73 bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
74 DWORD ret = timeGetTime();
75 if (high_res)
76 timeEndPeriod(kPeriod);
77 return ret;
78}
79#endif
tommic06b1332016-05-14 11:31:40 -070080} // namespace
81
tommif9d91542017-02-17 02:47:11 -080082class TaskQueue::MultimediaTimer {
83 public:
84 // kMaxTimers defines the limit of how many MultimediaTimer instances should
85 // be created.
86 // Background: The maximum number of supported handles for Wait functions, is
87 // MAXIMUM_WAIT_OBJECTS - 1 (63).
88 // There are some ways to work around the limitation but as it turns out, the
89 // limit of concurrently active multimedia timers per process, is much lower,
90 // or 16. So there isn't much value in going to the lenghts required to
91 // overcome the Wait limitations.
92 // kMaxTimers is larger than 16 though since it is possible that 'complete' or
93 // signaled timers that haven't been handled, are counted as part of
94 // kMaxTimers and thus a multimedia timer can actually be queued even though
95 // as far as we're concerned, there are more than 16 that are pending.
96 static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1;
97
98 // Controls how many MultimediaTimer instances a queue can hold before
99 // attempting to garbage collect (GC) timers that aren't in use.
100 static const int kInstanceThresholdGC = 8;
101
102 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
103
104 MultimediaTimer(MultimediaTimer&& timer)
105 : event_(timer.event_),
106 timer_id_(timer.timer_id_),
107 task_(std::move(timer.task_)) {
108 RTC_DCHECK(event_);
109 timer.event_ = nullptr;
110 timer.timer_id_ = 0;
111 }
112
113 ~MultimediaTimer() { Close(); }
114
115 // Implementing this operator is required because of the way
116 // some stl algorithms work, such as std::rotate().
117 MultimediaTimer& operator=(MultimediaTimer&& timer) {
118 if (this != &timer) {
119 Close();
120 event_ = timer.event_;
121 timer.event_ = nullptr;
122 task_ = std::move(timer.task_);
123 timer_id_ = timer.timer_id_;
124 timer.timer_id_ = 0;
125 }
126 return *this;
127 }
128
129 bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) {
130 RTC_DCHECK_EQ(0, timer_id_);
131 RTC_DCHECK(event_ != nullptr);
132 RTC_DCHECK(!task_.get());
133 RTC_DCHECK(task.get());
134 task_ = std::move(task);
135 timer_id_ =
136 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
137 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
138 return timer_id_ != 0;
139 }
140
141 std::unique_ptr<QueuedTask> Cancel() {
142 if (timer_id_) {
143 ::timeKillEvent(timer_id_);
144 timer_id_ = 0;
145 }
146 return std::move(task_);
147 }
148
149 void OnEventSignaled() {
150 RTC_DCHECK_NE(0, timer_id_);
151 timer_id_ = 0;
152 task_->Run() ? task_.reset() : static_cast<void>(task_.release());
153 }
154
155 HANDLE event() const { return event_; }
156
157 bool is_active() const { return timer_id_ != 0; }
158
159 private:
160 void Close() {
161 Cancel();
162
163 if (event_) {
164 ::CloseHandle(event_);
165 event_ = nullptr;
166 }
167 }
168
169 HANDLE event_ = nullptr;
170 MMRESULT timer_id_ = 0;
171 std::unique_ptr<QueuedTask> task_;
172
173 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
174};
175
tommic9bb7912017-02-24 10:42:14 -0800176TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
177 : thread_(&TaskQueue::ThreadMain,
178 this,
179 queue_name,
180 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700181 RTC_DCHECK(queue_name);
182 thread_.Start();
183 Event event(false, false);
184 ThreadStartupData startup = {&event, this};
185 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
186 reinterpret_cast<ULONG_PTR>(&startup)));
187 event.Wait(Event::kForever);
188}
189
190TaskQueue::~TaskQueue() {
191 RTC_DCHECK(!IsCurrent());
tommif9d91542017-02-17 02:47:11 -0800192 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
kwiberg352444f2016-11-28 15:58:53 -0800193 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
tommic06b1332016-05-14 11:31:40 -0700194 Sleep(1);
195 }
196 thread_.Stop();
197}
198
199// static
200TaskQueue* TaskQueue::Current() {
tommif9d91542017-02-17 02:47:11 -0800201 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls()));
tommic06b1332016-05-14 11:31:40 -0700202}
203
204// static
205bool TaskQueue::IsCurrent(const char* queue_name) {
206 TaskQueue* current = Current();
207 return current && current->thread_.name().compare(queue_name) == 0;
208}
209
210bool TaskQueue::IsCurrent() const {
211 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
212}
213
214void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
tommif9d91542017-02-17 02:47:11 -0800215 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
216 reinterpret_cast<LPARAM>(task.get()))) {
tommic06b1332016-05-14 11:31:40 -0700217 task.release();
218 }
219}
220
221void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
222 uint32_t milliseconds) {
223 WPARAM wparam;
224#if defined(_WIN64)
225 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
226 // so this compensation isn't that accurate, but since we have unused 32 bits
227 // on Win64, we might as well use them.
tommi5bdee472017-03-03 05:20:12 -0800228 wparam = (static_cast<WPARAM>(GetTick()) << 32) | milliseconds;
tommic06b1332016-05-14 11:31:40 -0700229#else
230 wparam = milliseconds;
231#endif
tommif9d91542017-02-17 02:47:11 -0800232 if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
233 reinterpret_cast<LPARAM>(task.get()))) {
tommic06b1332016-05-14 11:31:40 -0700234 task.release();
235 }
236}
237
238void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
239 std::unique_ptr<QueuedTask> reply,
240 TaskQueue* reply_queue) {
241 QueuedTask* task_ptr = task.release();
242 QueuedTask* reply_task_ptr = reply.release();
243 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
244 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
245 if (task_ptr->Run())
246 delete task_ptr;
247 // If the thread's message queue is full, we can't queue the task and will
248 // have to drop it (i.e. delete).
tommif9d91542017-02-17 02:47:11 -0800249 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
250 reinterpret_cast<LPARAM>(reply_task_ptr))) {
tommic06b1332016-05-14 11:31:40 -0700251 delete reply_task_ptr;
252 }
253 });
254}
255
256void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
257 std::unique_ptr<QueuedTask> reply) {
258 return PostTaskAndReply(std::move(task), std::move(reply), Current());
259}
260
261// static
tommi0f8b4032017-02-22 11:22:05 -0800262void TaskQueue::ThreadMain(void* context) {
tommif9d91542017-02-17 02:47:11 -0800263 HANDLE timer_handles[MultimediaTimer::kMaxTimers];
264 // Active multimedia timers.
265 std::vector<MultimediaTimer> mm_timers;
266 // Tasks that have been queued by using SetTimer/WM_TIMER.
tommib89257a2016-07-12 01:24:36 -0700267 DelayedTasks delayed_tasks;
tommif9d91542017-02-17 02:47:11 -0800268
tommib89257a2016-07-12 01:24:36 -0700269 while (true) {
tommif9d91542017-02-17 02:47:11 -0800270 RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles));
271 DWORD count = 0;
272 for (const auto& t : mm_timers) {
273 if (!t.is_active())
274 break;
275 timer_handles[count++] = t.event();
276 }
277 // Make sure we do an alertable wait as that's required to allow APCs to run
278 // (e.g. required for InitializeQueueThread and stopping the thread in
279 // PlatformThread).
280 DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE,
tommib89257a2016-07-12 01:24:36 -0700281 QS_ALLEVENTS, MWMO_ALERTABLE);
282 RTC_CHECK_NE(WAIT_FAILED, result);
tommif9d91542017-02-17 02:47:11 -0800283 // If we're not waiting for any timers, then count will be equal to
284 // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents
285 // "One more than the number of timers", which means that there's a
286 // message in the queue that needs to be handled.
287 // If |result| is less than |count|, then its value will be the index of the
288 // timer that has been signaled.
289 if (result == (WAIT_OBJECT_0 + count)) {
290 if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers))
tommib89257a2016-07-12 01:24:36 -0700291 break;
tommif9d91542017-02-17 02:47:11 -0800292 } else if (result < (WAIT_OBJECT_0 + count)) {
293 mm_timers[result].OnEventSignaled();
294 RTC_DCHECK(!mm_timers[result].is_active());
295 // Reuse timer events by moving inactive timers to the back of the vector.
296 // When new delayed tasks are queued, they'll get reused.
297 if (mm_timers.size() > 1) {
298 auto it = mm_timers.begin() + result;
299 std::rotate(it, it + 1, mm_timers.end());
300 }
301
302 // Collect some garbage.
303 if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) {
304 const auto inactive = std::find_if(
305 mm_timers.begin(), mm_timers.end(),
306 [](const MultimediaTimer& t) { return !t.is_active(); });
307 if (inactive != mm_timers.end()) {
308 // Since inactive timers are always moved to the back, we can
309 // safely delete all timers following the first inactive one.
310 mm_timers.erase(inactive, mm_timers.end());
311 }
312 }
tommib89257a2016-07-12 01:24:36 -0700313 } else {
314 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
315 }
316 }
tommib89257a2016-07-12 01:24:36 -0700317}
tommic06b1332016-05-14 11:31:40 -0700318
tommib89257a2016-07-12 01:24:36 -0700319// static
tommif9d91542017-02-17 02:47:11 -0800320bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks,
321 std::vector<MultimediaTimer>* timers) {
tommib89257a2016-07-12 01:24:36 -0700322 MSG msg = {};
tommif9d91542017-02-17 02:47:11 -0800323 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
tommib89257a2016-07-12 01:24:36 -0700324 msg.message != WM_QUIT) {
tommic06b1332016-05-14 11:31:40 -0700325 if (!msg.hwnd) {
326 switch (msg.message) {
327 case WM_RUN_TASK: {
328 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
329 if (task->Run())
330 delete task;
331 break;
332 }
333 case WM_QUEUE_DELAYED_TASK: {
tommif9d91542017-02-17 02:47:11 -0800334 std::unique_ptr<QueuedTask> task(
335 reinterpret_cast<QueuedTask*>(msg.lParam));
tommic06b1332016-05-14 11:31:40 -0700336 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
337#if defined(_WIN64)
338 // Subtract the time it took to queue the timer.
tommi5bdee472017-03-03 05:20:12 -0800339 const DWORD now = GetTick();
tommic06b1332016-05-14 11:31:40 -0700340 DWORD post_time = now - (msg.wParam >> 32);
341 milliseconds =
342 post_time > milliseconds ? 0 : milliseconds - post_time;
343#endif
tommif9d91542017-02-17 02:47:11 -0800344 bool timer_queued = false;
345 if (timers->size() < MultimediaTimer::kMaxTimers) {
346 MultimediaTimer* timer = nullptr;
347 auto available = std::find_if(
348 timers->begin(), timers->end(),
349 [](const MultimediaTimer& t) { return !t.is_active(); });
350 if (available != timers->end()) {
351 timer = &(*available);
352 } else {
353 timers->emplace_back();
354 timer = &timers->back();
355 }
356
357 timer_queued =
358 timer->StartOneShotTimer(std::move(task), milliseconds);
359 if (!timer_queued) {
360 // No more multimedia timers can be queued.
361 // Detach the task and fall back on SetTimer.
362 task = timer->Cancel();
363 }
364 }
365
366 // When we fail to use multimedia timers, we fall back on the more
367 // coarse SetTimer/WM_TIMER approach.
368 if (!timer_queued) {
369 UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr);
370 delayed_tasks->insert(std::make_pair(timer_id, task.release()));
371 }
tommic06b1332016-05-14 11:31:40 -0700372 break;
373 }
374 case WM_TIMER: {
tommif9d91542017-02-17 02:47:11 -0800375 ::KillTimer(nullptr, msg.wParam);
tommib89257a2016-07-12 01:24:36 -0700376 auto found = delayed_tasks->find(msg.wParam);
377 RTC_DCHECK(found != delayed_tasks->end());
tommic06b1332016-05-14 11:31:40 -0700378 if (!found->second->Run())
379 found->second.release();
tommib89257a2016-07-12 01:24:36 -0700380 delayed_tasks->erase(found);
tommic06b1332016-05-14 11:31:40 -0700381 break;
382 }
383 default:
384 RTC_NOTREACHED();
385 break;
386 }
387 } else {
tommif9d91542017-02-17 02:47:11 -0800388 ::TranslateMessage(&msg);
389 ::DispatchMessage(&msg);
tommic06b1332016-05-14 11:31:40 -0700390 }
391 }
tommib89257a2016-07-12 01:24:36 -0700392 return msg.message != WM_QUIT;
tommic06b1332016-05-14 11:31:40 -0700393}
tommib89257a2016-07-12 01:24:36 -0700394
tommic06b1332016-05-14 11:31:40 -0700395} // namespace rtc