blob: 81b1cd1b9f390ce177369602f50638249300b527 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
13#include <string.h>
tommic06b1332016-05-14 11:31:40 -070014
15#include "webrtc/base/checks.h"
16#include "webrtc/base/logging.h"
17
18namespace rtc {
19namespace {
20#define WM_RUN_TASK WM_USER + 1
21#define WM_QUEUE_DELAYED_TASK WM_USER + 2
22
23DWORD g_queue_ptr_tls = 0;
24
25BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
26 g_queue_ptr_tls = TlsAlloc();
27 return TRUE;
28}
29
30DWORD GetQueuePtrTls() {
31 static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
32 InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
33 return g_queue_ptr_tls;
34}
35
36struct ThreadStartupData {
37 Event* started;
38 void* thread_context;
39};
40
41void CALLBACK InitializeQueueThread(ULONG_PTR param) {
42 MSG msg;
43 PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
44 ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
45 TlsSetValue(GetQueuePtrTls(), data->thread_context);
46 data->started->Set();
47}
48} // namespace
49
50TaskQueue::TaskQueue(const char* queue_name)
51 : thread_(&TaskQueue::ThreadMain, this, queue_name) {
52 RTC_DCHECK(queue_name);
53 thread_.Start();
54 Event event(false, false);
55 ThreadStartupData startup = {&event, this};
56 RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
57 reinterpret_cast<ULONG_PTR>(&startup)));
58 event.Wait(Event::kForever);
59}
60
61TaskQueue::~TaskQueue() {
62 RTC_DCHECK(!IsCurrent());
63 while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
kwiberg352444f2016-11-28 15:58:53 -080064 RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError());
tommic06b1332016-05-14 11:31:40 -070065 Sleep(1);
66 }
67 thread_.Stop();
68}
69
70// static
71TaskQueue* TaskQueue::Current() {
72 return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
73}
74
75// static
76bool TaskQueue::IsCurrent(const char* queue_name) {
77 TaskQueue* current = Current();
78 return current && current->thread_.name().compare(queue_name) == 0;
79}
80
81bool TaskQueue::IsCurrent() const {
82 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
83}
84
85void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
86 if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
87 reinterpret_cast<LPARAM>(task.get()))) {
88 task.release();
89 }
90}
91
92void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
93 uint32_t milliseconds) {
94 WPARAM wparam;
95#if defined(_WIN64)
96 // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
97 // so this compensation isn't that accurate, but since we have unused 32 bits
98 // on Win64, we might as well use them.
99 wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
100#else
101 wparam = milliseconds;
102#endif
103 if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
104 reinterpret_cast<LPARAM>(task.get()))) {
105 task.release();
106 }
107}
108
109void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
110 std::unique_ptr<QueuedTask> reply,
111 TaskQueue* reply_queue) {
112 QueuedTask* task_ptr = task.release();
113 QueuedTask* reply_task_ptr = reply.release();
114 DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
115 PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
116 if (task_ptr->Run())
117 delete task_ptr;
118 // If the thread's message queue is full, we can't queue the task and will
119 // have to drop it (i.e. delete).
120 if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
121 reinterpret_cast<LPARAM>(reply_task_ptr))) {
122 delete reply_task_ptr;
123 }
124 });
125}
126
127void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
128 std::unique_ptr<QueuedTask> reply) {
129 return PostTaskAndReply(std::move(task), std::move(reply), Current());
130}
131
132// static
133bool TaskQueue::ThreadMain(void* context) {
tommib89257a2016-07-12 01:24:36 -0700134 DelayedTasks delayed_tasks;
135 while (true) {
136 DWORD result = ::MsgWaitForMultipleObjectsEx(0, nullptr, INFINITE,
137 QS_ALLEVENTS, MWMO_ALERTABLE);
138 RTC_CHECK_NE(WAIT_FAILED, result);
139 if (result == WAIT_OBJECT_0) {
140 if (!ProcessQueuedMessages(&delayed_tasks))
141 break;
142 } else {
143 RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result);
144 }
145 }
146 return false;
147}
tommic06b1332016-05-14 11:31:40 -0700148
tommib89257a2016-07-12 01:24:36 -0700149// static
150bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks) {
151 MSG msg = {};
152 while (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) &&
153 msg.message != WM_QUIT) {
tommic06b1332016-05-14 11:31:40 -0700154 if (!msg.hwnd) {
155 switch (msg.message) {
156 case WM_RUN_TASK: {
157 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
158 if (task->Run())
159 delete task;
160 break;
161 }
162 case WM_QUEUE_DELAYED_TASK: {
163 QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
164 uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
165#if defined(_WIN64)
166 // Subtract the time it took to queue the timer.
167 const DWORD now = GetTickCount();
168 DWORD post_time = now - (msg.wParam >> 32);
169 milliseconds =
170 post_time > milliseconds ? 0 : milliseconds - post_time;
171#endif
172 UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
tommib89257a2016-07-12 01:24:36 -0700173 delayed_tasks->insert(std::make_pair(timer_id, task));
tommic06b1332016-05-14 11:31:40 -0700174 break;
175 }
176 case WM_TIMER: {
177 KillTimer(nullptr, msg.wParam);
tommib89257a2016-07-12 01:24:36 -0700178 auto found = delayed_tasks->find(msg.wParam);
179 RTC_DCHECK(found != delayed_tasks->end());
tommic06b1332016-05-14 11:31:40 -0700180 if (!found->second->Run())
181 found->second.release();
tommib89257a2016-07-12 01:24:36 -0700182 delayed_tasks->erase(found);
tommic06b1332016-05-14 11:31:40 -0700183 break;
184 }
185 default:
186 RTC_NOTREACHED();
187 break;
188 }
189 } else {
190 TranslateMessage(&msg);
191 DispatchMessage(&msg);
192 }
193 }
tommib89257a2016-07-12 01:24:36 -0700194 return msg.message != WM_QUIT;
tommic06b1332016-05-14 11:31:40 -0700195}
tommib89257a2016-07-12 01:24:36 -0700196
tommic06b1332016-05-14 11:31:40 -0700197} // namespace rtc