tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
| 3 | * |
| 4 | * Use of this source code is governed by a BSD-style license |
| 5 | * that can be found in the LICENSE file in the root of the source |
| 6 | * tree. An additional intellectual property rights grant can be found |
| 7 | * in the file PATENTS. All contributing project authors may |
| 8 | * be found in the AUTHORS file in the root of the source tree. |
| 9 | */ |
| 10 | |
| 11 | #include "webrtc/base/task_queue.h" |
| 12 | |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 13 | #include <mmsystem.h> |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 14 | #include <string.h> |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 15 | |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 16 | #include <algorithm> |
| 17 | |
| 18 | #include "webrtc/base/arraysize.h" |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 19 | #include "webrtc/base/checks.h" |
| 20 | #include "webrtc/base/logging.h" |
| 21 | |
| 22 | namespace rtc { |
| 23 | namespace { |
| 24 | #define WM_RUN_TASK WM_USER + 1 |
| 25 | #define WM_QUEUE_DELAYED_TASK WM_USER + 2 |
| 26 | |
| 27 | DWORD g_queue_ptr_tls = 0; |
| 28 | |
| 29 | BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) { |
| 30 | g_queue_ptr_tls = TlsAlloc(); |
| 31 | return TRUE; |
| 32 | } |
| 33 | |
| 34 | DWORD GetQueuePtrTls() { |
| 35 | static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT; |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 36 | ::InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 37 | return g_queue_ptr_tls; |
| 38 | } |
| 39 | |
| 40 | struct ThreadStartupData { |
| 41 | Event* started; |
| 42 | void* thread_context; |
| 43 | }; |
| 44 | |
| 45 | void CALLBACK InitializeQueueThread(ULONG_PTR param) { |
| 46 | MSG msg; |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 47 | ::PeekMessage(&msg, nullptr, WM_USER, WM_USER, PM_NOREMOVE); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 48 | ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param); |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 49 | ::TlsSetValue(GetQueuePtrTls(), data->thread_context); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 50 | data->started->Set(); |
| 51 | } |
| 52 | } // namespace |
| 53 | |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 54 | class TaskQueue::MultimediaTimer { |
| 55 | public: |
| 56 | // kMaxTimers defines the limit of how many MultimediaTimer instances should |
| 57 | // be created. |
| 58 | // Background: The maximum number of supported handles for Wait functions, is |
| 59 | // MAXIMUM_WAIT_OBJECTS - 1 (63). |
| 60 | // There are some ways to work around the limitation but as it turns out, the |
| 61 | // limit of concurrently active multimedia timers per process, is much lower, |
| 62 | // or 16. So there isn't much value in going to the lenghts required to |
| 63 | // overcome the Wait limitations. |
| 64 | // kMaxTimers is larger than 16 though since it is possible that 'complete' or |
| 65 | // signaled timers that haven't been handled, are counted as part of |
| 66 | // kMaxTimers and thus a multimedia timer can actually be queued even though |
| 67 | // as far as we're concerned, there are more than 16 that are pending. |
| 68 | static const int kMaxTimers = MAXIMUM_WAIT_OBJECTS - 1; |
| 69 | |
| 70 | // Controls how many MultimediaTimer instances a queue can hold before |
| 71 | // attempting to garbage collect (GC) timers that aren't in use. |
| 72 | static const int kInstanceThresholdGC = 8; |
| 73 | |
| 74 | MultimediaTimer() : event_(::CreateEvent(nullptr, false, false, nullptr)) {} |
| 75 | |
| 76 | MultimediaTimer(MultimediaTimer&& timer) |
| 77 | : event_(timer.event_), |
| 78 | timer_id_(timer.timer_id_), |
| 79 | task_(std::move(timer.task_)) { |
| 80 | RTC_DCHECK(event_); |
| 81 | timer.event_ = nullptr; |
| 82 | timer.timer_id_ = 0; |
| 83 | } |
| 84 | |
| 85 | ~MultimediaTimer() { Close(); } |
| 86 | |
| 87 | // Implementing this operator is required because of the way |
| 88 | // some stl algorithms work, such as std::rotate(). |
| 89 | MultimediaTimer& operator=(MultimediaTimer&& timer) { |
| 90 | if (this != &timer) { |
| 91 | Close(); |
| 92 | event_ = timer.event_; |
| 93 | timer.event_ = nullptr; |
| 94 | task_ = std::move(timer.task_); |
| 95 | timer_id_ = timer.timer_id_; |
| 96 | timer.timer_id_ = 0; |
| 97 | } |
| 98 | return *this; |
| 99 | } |
| 100 | |
| 101 | bool StartOneShotTimer(std::unique_ptr<QueuedTask> task, UINT delay_ms) { |
| 102 | RTC_DCHECK_EQ(0, timer_id_); |
| 103 | RTC_DCHECK(event_ != nullptr); |
| 104 | RTC_DCHECK(!task_.get()); |
| 105 | RTC_DCHECK(task.get()); |
| 106 | task_ = std::move(task); |
| 107 | timer_id_ = |
| 108 | ::timeSetEvent(delay_ms, 0, reinterpret_cast<LPTIMECALLBACK>(event_), 0, |
| 109 | TIME_ONESHOT | TIME_CALLBACK_EVENT_SET); |
| 110 | return timer_id_ != 0; |
| 111 | } |
| 112 | |
| 113 | std::unique_ptr<QueuedTask> Cancel() { |
| 114 | if (timer_id_) { |
| 115 | ::timeKillEvent(timer_id_); |
| 116 | timer_id_ = 0; |
| 117 | } |
| 118 | return std::move(task_); |
| 119 | } |
| 120 | |
| 121 | void OnEventSignaled() { |
| 122 | RTC_DCHECK_NE(0, timer_id_); |
| 123 | timer_id_ = 0; |
| 124 | task_->Run() ? task_.reset() : static_cast<void>(task_.release()); |
| 125 | } |
| 126 | |
| 127 | HANDLE event() const { return event_; } |
| 128 | |
| 129 | bool is_active() const { return timer_id_ != 0; } |
| 130 | |
| 131 | private: |
| 132 | void Close() { |
| 133 | Cancel(); |
| 134 | |
| 135 | if (event_) { |
| 136 | ::CloseHandle(event_); |
| 137 | event_ = nullptr; |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | HANDLE event_ = nullptr; |
| 142 | MMRESULT timer_id_ = 0; |
| 143 | std::unique_ptr<QueuedTask> task_; |
| 144 | |
| 145 | RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer); |
| 146 | }; |
| 147 | |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 148 | TaskQueue::TaskQueue(const char* queue_name) |
| 149 | : thread_(&TaskQueue::ThreadMain, this, queue_name) { |
| 150 | RTC_DCHECK(queue_name); |
| 151 | thread_.Start(); |
| 152 | Event event(false, false); |
| 153 | ThreadStartupData startup = {&event, this}; |
| 154 | RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread, |
| 155 | reinterpret_cast<ULONG_PTR>(&startup))); |
| 156 | event.Wait(Event::kForever); |
| 157 | } |
| 158 | |
| 159 | TaskQueue::~TaskQueue() { |
| 160 | RTC_DCHECK(!IsCurrent()); |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 161 | while (!::PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) { |
kwiberg | 352444f | 2016-11-28 15:58:53 -0800 | [diff] [blame] | 162 | RTC_CHECK_EQ(ERROR_NOT_ENOUGH_QUOTA, ::GetLastError()); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 163 | Sleep(1); |
| 164 | } |
| 165 | thread_.Stop(); |
| 166 | } |
| 167 | |
| 168 | // static |
| 169 | TaskQueue* TaskQueue::Current() { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 170 | return static_cast<TaskQueue*>(::TlsGetValue(GetQueuePtrTls())); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 171 | } |
| 172 | |
| 173 | // static |
| 174 | bool TaskQueue::IsCurrent(const char* queue_name) { |
| 175 | TaskQueue* current = Current(); |
| 176 | return current && current->thread_.name().compare(queue_name) == 0; |
| 177 | } |
| 178 | |
| 179 | bool TaskQueue::IsCurrent() const { |
| 180 | return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); |
| 181 | } |
| 182 | |
| 183 | void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 184 | if (::PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0, |
| 185 | reinterpret_cast<LPARAM>(task.get()))) { |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 186 | task.release(); |
| 187 | } |
| 188 | } |
| 189 | |
| 190 | void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, |
| 191 | uint32_t milliseconds) { |
| 192 | WPARAM wparam; |
| 193 | #if defined(_WIN64) |
| 194 | // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms) |
| 195 | // so this compensation isn't that accurate, but since we have unused 32 bits |
| 196 | // on Win64, we might as well use them. |
| 197 | wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds; |
| 198 | #else |
| 199 | wparam = milliseconds; |
| 200 | #endif |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 201 | if (::PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam, |
| 202 | reinterpret_cast<LPARAM>(task.get()))) { |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 203 | task.release(); |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 208 | std::unique_ptr<QueuedTask> reply, |
| 209 | TaskQueue* reply_queue) { |
| 210 | QueuedTask* task_ptr = task.release(); |
| 211 | QueuedTask* reply_task_ptr = reply.release(); |
| 212 | DWORD reply_thread_id = reply_queue->thread_.GetThreadRef(); |
| 213 | PostTask([task_ptr, reply_task_ptr, reply_thread_id]() { |
| 214 | if (task_ptr->Run()) |
| 215 | delete task_ptr; |
| 216 | // If the thread's message queue is full, we can't queue the task and will |
| 217 | // have to drop it (i.e. delete). |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 218 | if (!::PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0, |
| 219 | reinterpret_cast<LPARAM>(reply_task_ptr))) { |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 220 | delete reply_task_ptr; |
| 221 | } |
| 222 | }); |
| 223 | } |
| 224 | |
| 225 | void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| 226 | std::unique_ptr<QueuedTask> reply) { |
| 227 | return PostTaskAndReply(std::move(task), std::move(reply), Current()); |
| 228 | } |
| 229 | |
| 230 | // static |
tommi | 0f8b403 | 2017-02-22 11:22:05 -0800 | [diff] [blame] | 231 | void TaskQueue::ThreadMain(void* context) { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 232 | HANDLE timer_handles[MultimediaTimer::kMaxTimers]; |
| 233 | // Active multimedia timers. |
| 234 | std::vector<MultimediaTimer> mm_timers; |
| 235 | // Tasks that have been queued by using SetTimer/WM_TIMER. |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 236 | DelayedTasks delayed_tasks; |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 237 | |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 238 | while (true) { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 239 | RTC_DCHECK(mm_timers.size() <= arraysize(timer_handles)); |
| 240 | DWORD count = 0; |
| 241 | for (const auto& t : mm_timers) { |
| 242 | if (!t.is_active()) |
| 243 | break; |
| 244 | timer_handles[count++] = t.event(); |
| 245 | } |
| 246 | // Make sure we do an alertable wait as that's required to allow APCs to run |
| 247 | // (e.g. required for InitializeQueueThread and stopping the thread in |
| 248 | // PlatformThread). |
| 249 | DWORD result = ::MsgWaitForMultipleObjectsEx(count, timer_handles, INFINITE, |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 250 | QS_ALLEVENTS, MWMO_ALERTABLE); |
| 251 | RTC_CHECK_NE(WAIT_FAILED, result); |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 252 | // If we're not waiting for any timers, then count will be equal to |
| 253 | // WAIT_OBJECT_0. If we're waiting for timers, then |count| represents |
| 254 | // "One more than the number of timers", which means that there's a |
| 255 | // message in the queue that needs to be handled. |
| 256 | // If |result| is less than |count|, then its value will be the index of the |
| 257 | // timer that has been signaled. |
| 258 | if (result == (WAIT_OBJECT_0 + count)) { |
| 259 | if (!ProcessQueuedMessages(&delayed_tasks, &mm_timers)) |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 260 | break; |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 261 | } else if (result < (WAIT_OBJECT_0 + count)) { |
| 262 | mm_timers[result].OnEventSignaled(); |
| 263 | RTC_DCHECK(!mm_timers[result].is_active()); |
| 264 | // Reuse timer events by moving inactive timers to the back of the vector. |
| 265 | // When new delayed tasks are queued, they'll get reused. |
| 266 | if (mm_timers.size() > 1) { |
| 267 | auto it = mm_timers.begin() + result; |
| 268 | std::rotate(it, it + 1, mm_timers.end()); |
| 269 | } |
| 270 | |
| 271 | // Collect some garbage. |
| 272 | if (mm_timers.size() > MultimediaTimer::kInstanceThresholdGC) { |
| 273 | const auto inactive = std::find_if( |
| 274 | mm_timers.begin(), mm_timers.end(), |
| 275 | [](const MultimediaTimer& t) { return !t.is_active(); }); |
| 276 | if (inactive != mm_timers.end()) { |
| 277 | // Since inactive timers are always moved to the back, we can |
| 278 | // safely delete all timers following the first inactive one. |
| 279 | mm_timers.erase(inactive, mm_timers.end()); |
| 280 | } |
| 281 | } |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 282 | } else { |
| 283 | RTC_DCHECK_EQ(WAIT_IO_COMPLETION, result); |
| 284 | } |
| 285 | } |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 286 | } |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 287 | |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 288 | // static |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 289 | bool TaskQueue::ProcessQueuedMessages(DelayedTasks* delayed_tasks, |
| 290 | std::vector<MultimediaTimer>* timers) { |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 291 | MSG msg = {}; |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 292 | while (::PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) && |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 293 | msg.message != WM_QUIT) { |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 294 | if (!msg.hwnd) { |
| 295 | switch (msg.message) { |
| 296 | case WM_RUN_TASK: { |
| 297 | QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam); |
| 298 | if (task->Run()) |
| 299 | delete task; |
| 300 | break; |
| 301 | } |
| 302 | case WM_QUEUE_DELAYED_TASK: { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 303 | std::unique_ptr<QueuedTask> task( |
| 304 | reinterpret_cast<QueuedTask*>(msg.lParam)); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 305 | uint32_t milliseconds = msg.wParam & 0xFFFFFFFF; |
| 306 | #if defined(_WIN64) |
| 307 | // Subtract the time it took to queue the timer. |
| 308 | const DWORD now = GetTickCount(); |
| 309 | DWORD post_time = now - (msg.wParam >> 32); |
| 310 | milliseconds = |
| 311 | post_time > milliseconds ? 0 : milliseconds - post_time; |
| 312 | #endif |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 313 | bool timer_queued = false; |
| 314 | if (timers->size() < MultimediaTimer::kMaxTimers) { |
| 315 | MultimediaTimer* timer = nullptr; |
| 316 | auto available = std::find_if( |
| 317 | timers->begin(), timers->end(), |
| 318 | [](const MultimediaTimer& t) { return !t.is_active(); }); |
| 319 | if (available != timers->end()) { |
| 320 | timer = &(*available); |
| 321 | } else { |
| 322 | timers->emplace_back(); |
| 323 | timer = &timers->back(); |
| 324 | } |
| 325 | |
| 326 | timer_queued = |
| 327 | timer->StartOneShotTimer(std::move(task), milliseconds); |
| 328 | if (!timer_queued) { |
| 329 | // No more multimedia timers can be queued. |
| 330 | // Detach the task and fall back on SetTimer. |
| 331 | task = timer->Cancel(); |
| 332 | } |
| 333 | } |
| 334 | |
| 335 | // When we fail to use multimedia timers, we fall back on the more |
| 336 | // coarse SetTimer/WM_TIMER approach. |
| 337 | if (!timer_queued) { |
| 338 | UINT_PTR timer_id = ::SetTimer(nullptr, 0, milliseconds, nullptr); |
| 339 | delayed_tasks->insert(std::make_pair(timer_id, task.release())); |
| 340 | } |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 341 | break; |
| 342 | } |
| 343 | case WM_TIMER: { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 344 | ::KillTimer(nullptr, msg.wParam); |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 345 | auto found = delayed_tasks->find(msg.wParam); |
| 346 | RTC_DCHECK(found != delayed_tasks->end()); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 347 | if (!found->second->Run()) |
| 348 | found->second.release(); |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 349 | delayed_tasks->erase(found); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 350 | break; |
| 351 | } |
| 352 | default: |
| 353 | RTC_NOTREACHED(); |
| 354 | break; |
| 355 | } |
| 356 | } else { |
tommi | f9d9154 | 2017-02-17 02:47:11 -0800 | [diff] [blame] | 357 | ::TranslateMessage(&msg); |
| 358 | ::DispatchMessage(&msg); |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 359 | } |
| 360 | } |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 361 | return msg.message != WM_QUIT; |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 362 | } |
tommi | b89257a | 2016-07-12 01:24:36 -0700 | [diff] [blame] | 363 | |
tommi | c06b133 | 2016-05-14 11:31:40 -0700 | [diff] [blame] | 364 | } // namespace rtc |