blob: f7332d76ed8e99a1acc235127d0c92e2e314bb96 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
tommif9d91542017-02-17 02:47:11 -080013#include <mmsystem.h>
tommic06b1332016-05-14 11:31:40 -070014#include <string.h>
tommic06b1332016-05-14 11:31:40 -070015
tommif9d91542017-02-17 02:47:11 -080016#include <algorithm>
tommi0b942152017-03-10 09:33:53 -080017#include <queue>
tommif9d91542017-02-17 02:47:11 -080018
tommic06b1332016-05-14 11:31:40 -070019#include "webrtc/base/checks.h"
20#include "webrtc/base/logging.h"
tommi0b942152017-03-10 09:33:53 -080021#include "webrtc/base/safe_conversions.h"
22#include "webrtc/base/timeutils.h"
tommic06b1332016-05-14 11:31:40 -070023
24namespace rtc {
25namespace {
26#define WM_RUN_TASK WM_USER + 1
27#define WM_QUEUE_DELAYED_TASK WM_USER + 2
28
tommic9bb7912017-02-24 10:42:14 -080029using Priority = TaskQueue::Priority;
30
tommic06b1332016-05-14 11:31:40 -070031DWORD g_queue_ptr_tls = 0;
32
33BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
34 g_queue_ptr_tls = TlsAlloc();
35 return TRUE;
36}
37
38DWORD GetQueuePtrTls() {
39 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
tommif9d91542017-02-17 02:47:11 -080040 ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
tommic06b1332016-05-14 11:31:40 -070041 return g_queue_ptr_tls;
42}
43
44struct ThreadStartupData {
45 Event* started;
46 void* thread_context;
47};
48
49void CALLBACK InitializeQueueThread(ULONG_PTR param) {
50 MSG msg;
tommif9d91542017-02-17 02:47:11 -080051 ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE);
tommic06b1332016-05-14 11:31:40 -070052 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
tommif9d91542017-02-17 02:47:11 -080053 ::TlsSetValue(GetQueuePtrTls(), data->thread_context);
tommic06b1332016-05-14 11:31:40 -070054 data->started->Set();
55}
tommic9bb7912017-02-24 10:42:14 -080056
57ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
58 switch (priority) {
59 case Priority::HIGH:
60 return kRealtimePriority;
61 case Priority::LOW:
62 return kLowPriority;
63 case Priority::NORMAL:
64 return kNormalPriority;
65 default:
66 RTC_NOTREACHED();
67 break;
68 }
69 return kNormalPriority;
70}
tommi5bdee472017-03-03 05:20:12 -080071
tommi0b942152017-03-10 09:33:53 -080072int64_t GetTick() {
tommi5bdee472017-03-03 05:20:12 -080073 static const UINT kPeriod = 1;
74 bool high_res = (timeBeginPeriod(kPeriod) == TIMERR_NOERROR);
tommi0b942152017-03-10 09:33:53 -080075 int64_t ret = TimeMillis();
tommi5bdee472017-03-03 05:20:12 -080076 if (high_res)
77 timeEndPeriod(kPeriod);
78 return ret;
79}
tommic06b1332016-05-14 11:31:40 -070080
tommi0b942152017-03-10 09:33:53 -080081class DelayedTaskInfo {
tommif9d91542017-02-17 02:47:11 -080082 public:
tommi0b942152017-03-10 09:33:53 -080083 // Default ctor needed to support priority_queue::pop().
84 DelayedTaskInfo() {}
85 DelayedTaskInfo(uint32_t milliseconds, std::unique_ptr<QueuedTask> task)
86 : due_time_(GetTick() + milliseconds), task_(std::move(task)) {}
87 DelayedTaskInfo(DelayedTaskInfo&&) = default;
tommif9d91542017-02-17 02:47:11 -080088
tommi0b942152017-03-10 09:33:53 -080089 // Implement for priority_queue.
90 bool operator>(const DelayedTaskInfo& other) const {
91 return due_time_ > other.due_time_;
92 }
tommif9d91542017-02-17 02:47:11 -080093
tommi0b942152017-03-10 09:33:53 -080094 // Required by priority_queue::pop().
95 DelayedTaskInfo& operator=(DelayedTaskInfo&& other) = default;
96
97 // See below for why this method is const.
98 void Run() const {
99 RTC_DCHECK(due_time_);
100 task_->Run() ? task_.reset() : static_cast<void>(task_.release());
101 }
102
103 int64_t due_time() const { return due_time_; }
104
105 private:
106 int64_t due_time_ = 0; // Absolute timestamp in milliseconds.
107
108 // |task| needs to be mutable because std::priority_queue::top() returns
109 // a const reference and a key in an ordered queue must not be changed.
110 // There are two basic workarounds, one using const_cast, which would also
111 // make the key (|due_time|), non-const and the other is to make the non-key
112 // (|task|), mutable.
113 // Because of this, the |task| variable is made private and can only be
114 // mutated by calling the |Run()| method.
115 mutable std::unique_ptr<QueuedTask> task_;
116};
117
118class MultimediaTimer {
119 public:
tommif9d91542017-02-17 02:47:11 -0800120 MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {}
121
tommi0b942152017-03-10 09:33:53 -0800122 ~MultimediaTimer() {
123 Cancel();
124 ::CloseHandle(event_);
tommif9d91542017-02-17 02:47:11 -0800125 }
126
tommi0b942152017-03-10 09:33:53 -0800127 bool StartOneShotTimer(UINT delay_ms) {
tommif9d91542017-02-17 02:47:11 -0800128 RTC_DCHECK_EQ(0, timer_id_);
129 RTC_DCHECK(event_ != nullptr);
tommif9d91542017-02-17 02:47:11 -0800130 timer_id_ =
131 ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0,
132 TIME_ONESHOT | TIME_CALLBACK_EVENT_SET);
133 return timer_id_ != 0;
134 }
135
tommi0b942152017-03-10 09:33:53 -0800136 void Cancel() {
tommif9d91542017-02-17 02:47:11 -0800137 if (timer_id_) {
138 ::timeKillEvent(timer_id_);
139 timer_id_ = 0;
140 }
tommif9d91542017-02-17 02:47:11 -0800141 }
142
tommi0b942152017-03-10 09:33:53 -0800143 HANDLE* event_for_wait() { return &event_; }
tommif9d91542017-02-17 02:47:11 -0800144
145 private:
tommif9d91542017-02-17 02:47:11 -0800146 HANDLE event_ = nullptr;
147 MMRESULT timer_id_ = 0;
tommif9d91542017-02-17 02:47:11 -0800148
149 RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
150};
151
tommi0b942152017-03-10 09:33:53 -0800152} // namespace
153
154class TaskQueue::ThreadState {
155 public:
156 ThreadState() {}
157 ~ThreadState() {}
158
159 void RunThreadMain();
160
161 private:
162 bool ProcessQueuedMessages();
163 void RunDueTasks();
164 void ScheduleNextTimer();
165 void CancelTimers();
166
167 // Since priority_queue<> by defult orders items in terms of
168 // largest->smallest, using std::less<>, and we want smallest->largest,
169 // we would like to use std::greater<> here. Alas it's only available in
170 // C++14 and later, so we roll our own compare template that that relies on
171 // operator<().
172 template <typename T>
173 struct greater {
174 bool operator()(const T& l, const T& r) { return l > r; }
175 };
176
177 MultimediaTimer timer_;
178 std::priority_queue<DelayedTaskInfo,
179 std::vector<DelayedTaskInfo>,
180 greater<DelayedTaskInfo>>
181 timer_tasks_;
182 UINT_PTR timer_id_ = 0;
183};
184
tommic9bb7912017-02-24 10:42:14 -0800185TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
186 : thread_(&TaskQueue::ThreadMain,
187 this,
188 queue_name,
189 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700190 RTC_DCHECK(queue_name);
191 thread_.Start();
192 Event event(false, false);
193 ThreadStartupData startup = {&event, this};
194 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
195 reinterpret_cast<ULONG_PTR>(&startup)));
196 event.Wait(Event::kForever);
197}
198
199TaskQueue::~TaskQueue() {
200 RTC_DCHECK(!IsCurrent());
tommif9d91542017-02-17 02:47:11 -0800201 while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
kwiberg352444f2016-11-28 15:58:53 -0800202 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
tommic06b1332016-05-14 11:31:40 -0700203 Sleep(1);
204 }
205 thread_.Stop();
206}
207
208// static
209TaskQueue* TaskQueue::Current() {
tommif9d91542017-02-17 02:47:11 -0800210 return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls()));
tommic06b1332016-05-14 11:31:40 -0700211}
212
213// static
214bool TaskQueue::IsCurrent(const char* queue_name) {
215 TaskQueue* current = Current();
216 return current && current->thread_.name().compare(queue_name) == 0;
217}
218
219bool TaskQueue::IsCurrent() const {
220 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
221}
222
223void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
tommif9d91542017-02-17 02:47:11 -0800224 if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
225 reinterpret_cast<LPARAM>(task.get()))) {
tommic06b1332016-05-14 11:31:40 -0700226 task.release();
227 }
228}
229
230void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
231 uint32_t milliseconds) {
tommi0b942152017-03-10 09:33:53 -0800232 if (!milliseconds) {
233 PostTask(std::move(task));
234 return;
235 }
236
237 // TODO(tommi): Avoid this allocation. It is currently here since
238 // the timestamp stored in the task info object, is a 64bit timestamp
239 // and WPARAM is 32bits in 32bit builds. Otherwise, we could pass the
240 // task pointer and timestamp as LPARAM and WPARAM.
241 auto* task_info = new DelayedTaskInfo(milliseconds, std::move(task));
242 if (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, 0,
243 reinterpret_cast<LPARAM>(task_info))) {
244 delete task_info;
tommic06b1332016-05-14 11:31:40 -0700245 }
246}
247
248void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
249 std::unique_ptr<QueuedTask> reply,
250 TaskQueue* reply_queue) {
251 QueuedTask* task_ptr = task.release();
252 QueuedTask* reply_task_ptr = reply.release();
253 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
254 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
255 if (task_ptr->Run())
256 delete task_ptr;
257 // If the thread's message queue is full, we can't queue the task and will
258 // have to drop it (i.e. delete).
tommif9d91542017-02-17 02:47:11 -0800259 if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
260 reinterpret_cast<LPARAM>(reply_task_ptr))) {
tommic06b1332016-05-14 11:31:40 -0700261 delete reply_task_ptr;
262 }
263 });
264}
265
266void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
267 std::unique_ptr<QueuedTask> reply) {
268 return PostTaskAndReply(std::move(task), std::move(reply), Current());
269}
270
271// static
tommi0f8b4032017-02-22 11:22:05 -0800272void TaskQueue::ThreadMain(void* context) {
tommi0b942152017-03-10 09:33:53 -0800273 ThreadState state;
274 state.RunThreadMain();
275}
tommif9d91542017-02-17 02:47:11 -0800276
tommi0b942152017-03-10 09:33:53 -0800277void TaskQueue::ThreadState::RunThreadMain() {
tommib89257a2016-07-12 01:24:36 -0700278 while (true) {
tommif9d91542017-02-17 02:47:11 -0800279 // Make sure we do an alertable wait as that's required to allow APCs to run
280 // (e.g. required for InitializeQueueThread and stopping the thread in
281 // PlatformThread).
tommi0b942152017-03-10 09:33:53 -0800282 DWORD result = ::MsgWaitForMultipleObjectsEx(
283 1, timer_.event_for_wait(), INFINITE, QS_ALLEVENTS, MWMO_ALERTABLE);
tommib89257a2016-07-12 01:24:36 -0700284 RTC_CHECK_NE(WAIT_FAILED, result);
tommi0b942152017-03-10 09:33:53 -0800285 if (result == (WAIT_OBJECT_0 + 1)) {
286 // There are messages in the message queue that need to be handled.
287 if (!ProcessQueuedMessages())
tommib89257a2016-07-12 01:24:36 -0700288 break;
tommi0b942152017-03-10 09:33:53 -0800289 } else if (result == WAIT_OBJECT_0) {
290 // The multimedia timer was signaled.
291 timer_.Cancel();
292 RTC_DCHECK(!timer_tasks_.empty());
293 RunDueTasks();
294 ScheduleNextTimer();
tommib89257a2016-07-12 01:24:36 -0700295 } else {
296 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
297 }
298 }
tommib89257a2016-07-12 01:24:36 -0700299}
tommic06b1332016-05-14 11:31:40 -0700300
tommi0b942152017-03-10 09:33:53 -0800301bool TaskQueue::ThreadState::ProcessQueuedMessages() {
tommib89257a2016-07-12 01:24:36 -0700302 MSG msg = {};
tommif9d91542017-02-17 02:47:11 -0800303 while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
tommib89257a2016-07-12 01:24:36 -0700304 msg.message != WM_QUIT) {
tommic06b1332016-05-14 11:31:40 -0700305 if (!msg.hwnd) {
306 switch (msg.message) {
307 case WM_RUN_TASK: {
308 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
309 if (task->Run())
310 delete task;
311 break;
312 }
313 case WM_QUEUE_DELAYED_TASK: {
tommi0b942152017-03-10 09:33:53 -0800314 std::unique_ptr<DelayedTaskInfo> info(
315 reinterpret_cast<DelayedTaskInfo*>(msg.lParam));
316 bool need_to_schedule_timers =
317 timer_tasks_.empty() ||
318 timer_tasks_.top().due_time() > info->due_time();
319 timer_tasks_.emplace(std::move(*info.get()));
320 if (need_to_schedule_timers) {
321 CancelTimers();
322 ScheduleNextTimer();
tommif9d91542017-02-17 02:47:11 -0800323 }
tommic06b1332016-05-14 11:31:40 -0700324 break;
325 }
326 case WM_TIMER: {
tommi0b942152017-03-10 09:33:53 -0800327 RTC_DCHECK_EQ(timer_id_, msg.wParam);
tommif9d91542017-02-17 02:47:11 -0800328 ::KillTimer(nullptr, msg.wParam);
tommi0b942152017-03-10 09:33:53 -0800329 timer_id_ = 0;
330 RunDueTasks();
331 ScheduleNextTimer();
tommic06b1332016-05-14 11:31:40 -0700332 break;
333 }
334 default:
335 RTC_NOTREACHED();
336 break;
337 }
338 } else {
tommif9d91542017-02-17 02:47:11 -0800339 ::TranslateMessage(&msg);
340 ::DispatchMessage(&msg);
tommic06b1332016-05-14 11:31:40 -0700341 }
342 }
tommib89257a2016-07-12 01:24:36 -0700343 return msg.message != WM_QUIT;
tommic06b1332016-05-14 11:31:40 -0700344}
tommib89257a2016-07-12 01:24:36 -0700345
tommi0b942152017-03-10 09:33:53 -0800346void TaskQueue::ThreadState::RunDueTasks() {
347 RTC_DCHECK(!timer_tasks_.empty());
348 auto now = GetTick();
349 do {
350 const auto& top = timer_tasks_.top();
351 if (top.due_time() > now)
352 break;
353 top.Run();
354 timer_tasks_.pop();
355 } while (!timer_tasks_.empty());
356}
357
358void TaskQueue::ThreadState::ScheduleNextTimer() {
359 RTC_DCHECK_EQ(timer_id_, 0);
360 if (timer_tasks_.empty())
361 return;
362
363 const auto& next_task = timer_tasks_.top();
364 int64_t delay_ms = std::max(0ll, next_task.due_time() - GetTick());
365 uint32_t milliseconds = rtc::dchecked_cast<uint32_t>(delay_ms);
366 if (!timer_.StartOneShotTimer(milliseconds))
367 timer_id_ = ::SetTimer(nullptr, 0, milliseconds, nullptr);
368}
369
370void TaskQueue::ThreadState::CancelTimers() {
371 timer_.Cancel();
372 if (timer_id_) {
373 ::KillTimer(nullptr, timer_id_);
374 timer_id_ = 0;
375 }
376}
377
tommic06b1332016-05-14 11:31:40 -0700378} // namespace rtc