blob: 1aa5b33c2d1bb17f8b2e2d0a20940141daf4bdaa [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
5#include "api/impl/task_runner_impl.h"
6
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
11TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
12 : now_function_(now_function), is_running_(false) {}
13
14TaskRunnerImpl::~TaskRunnerImpl() {
15 RequestStopSoon();
16}
17
18void TaskRunnerImpl::PostTask(Task task) {
19 std::lock_guard<std::mutex> lock(task_mutex_);
20 tasks_.push_back(std::move(task));
21 run_loop_wakeup_.notify_one();
22}
23
24void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
25 std::lock_guard<std::mutex> lock(task_mutex_);
26 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
27 run_loop_wakeup_.notify_one();
28}
29
30void TaskRunnerImpl::RunUntilStopped() {
31 const bool was_running = is_running_.exchange(true);
32 OSP_CHECK(!was_running);
33
34 RunTasksUntilStopped();
35}
36
37void TaskRunnerImpl::RequestStopSoon() {
38 const bool was_running = is_running_.exchange(false);
39
40 if (was_running) {
41 std::lock_guard<std::mutex> lock(task_mutex_);
42 run_loop_wakeup_.notify_one();
43 }
44}
45
46void TaskRunnerImpl::RunUntilIdleForTesting() {
47 ScheduleDelayedTasks();
48 RunCurrentTasksForTesting();
49}
50
51void TaskRunnerImpl::RunCurrentTasksForTesting() {
52 std::deque<Task> current_tasks;
53
54 std::unique_lock<std::mutex> lock(task_mutex_);
55 tasks_.swap(current_tasks);
56 lock.unlock();
57
58 for (Task& task : current_tasks) {
59 task();
60 }
61}
62
63void TaskRunnerImpl::RunCurrentTasksBlocking() {
64 std::deque<Task> current_tasks;
65
66 auto lock = WaitForWorkAndAcquireLock();
67 tasks_.swap(current_tasks);
68 lock.unlock();
69
70 for (Task& task : current_tasks) {
71 task();
72 }
73}
74
75void TaskRunnerImpl::RunTasksUntilStopped() {
76 while (is_running_) {
77 ScheduleDelayedTasks();
78 RunCurrentTasksBlocking();
79 }
80}
81
82void TaskRunnerImpl::ScheduleDelayedTasks() {
83 std::lock_guard<std::mutex> lock(task_mutex_);
84
85 // Getting the time can be expensive on some platforms, so only get it once.
86 const auto current_time = now_function_();
87 while (!delayed_tasks_.empty() &&
88 (delayed_tasks_.top().time_runnable_after < current_time)) {
89 tasks_.push_back(std::move(delayed_tasks_.top().task));
90 delayed_tasks_.pop();
91 }
92}
93
94std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
95 std::unique_lock<std::mutex> lock(task_mutex_);
96
97 // Pass a wait predicate to avoid lost or spurious wakeups.
98 const auto wait_predicate = [this] {
99 // Either we got woken up because we aren't running
100 // (probably just to end the thread), or we are running and have tasks to
101 // do.
102 return !this->is_running_ || !this->tasks_.empty() ||
103 !this->delayed_tasks_.empty();
104 };
105
106 // We don't have any work to do currently, but know we have some in the pipe.
107 if (!delayed_tasks_.empty()) {
108 run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
109 wait_predicate);
110
111 // We don't have any work queued.
112 } else if (tasks_.empty()) {
113 run_loop_wakeup_.wait(lock, wait_predicate);
114 }
115
116 return lock;
117}
118} // namespace platform
119} // namespace openscreen