blob: 0f142ed4595cdeb73e4f6f88efe9221566f23d5b [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Jordan Baylesb0c191e2019-03-26 15:49:57 -07009#include "platform/api/logging.h"
10
11namespace openscreen {
12namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070013
btolschd94fe622019-05-09 14:21:40 -070014TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
15 TaskWaiter* event_waiter,
16 Clock::duration waiter_timeout)
17 : now_function_(now_function),
18 is_running_(false),
19 task_waiter_(event_waiter),
20 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
Jordan Baylesa8e96772019-04-08 10:53:54 -070022TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070023
Yuri Wiitalab929b832019-06-05 17:13:15 -070024void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070025 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070026 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070027 if (task_waiter_) {
28 task_waiter_->OnTaskPosted();
29 } else {
30 run_loop_wakeup_.notify_one();
31 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070032}
33
Yuri Wiitalab929b832019-06-05 17:13:15 -070034void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
35 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070036 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070037 delayed_tasks_.emplace(
38 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070039 if (task_waiter_) {
40 task_waiter_->OnTaskPosted();
41 } else {
42 run_loop_wakeup_.notify_one();
43 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070044}
45
46void TaskRunnerImpl::RunUntilStopped() {
47 const bool was_running = is_running_.exchange(true);
48 OSP_CHECK(!was_running);
49
50 RunTasksUntilStopped();
51}
52
53void TaskRunnerImpl::RequestStopSoon() {
54 const bool was_running = is_running_.exchange(false);
55
56 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070057 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070058 if (task_waiter_) {
59 task_waiter_->OnTaskPosted();
60 } else {
61 std::lock_guard<std::mutex> lock(task_mutex_);
62 run_loop_wakeup_.notify_one();
63 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070064 }
65}
66
67void TaskRunnerImpl::RunUntilIdleForTesting() {
68 ScheduleDelayedTasks();
69 RunCurrentTasksForTesting();
70}
71
72void TaskRunnerImpl::RunCurrentTasksForTesting() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070073 {
74 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
75 // take the lock and drain the tasks_ queue. This allows tests to avoid
76 // having to do any multithreading to interact with the queue.
77 std::unique_lock<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070078 OSP_DCHECK(running_tasks_.empty());
79 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070080 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070081
Ryan Keaneefab2ed2019-07-22 12:36:53 -070082 for (TaskWithMetadata& task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070083 // Move the task to the stack so that its bound state is freed immediately
84 // after being run.
85 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070086 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070087 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070088}
89
90void TaskRunnerImpl::RunCurrentTasksBlocking() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070091 {
92 // Wait for the lock. If there are no current tasks, we will wait until
93 // a delayed task is ready or a task gets added to the queue.
94 auto lock = WaitForWorkAndAcquireLock();
95 if (!lock) {
96 return;
97 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070098
Yuri Wiitalab929b832019-06-05 17:13:15 -070099 OSP_DCHECK(running_tasks_.empty());
100 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700101 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700102
Yuri Wiitalab929b832019-06-05 17:13:15 -0700103 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700104 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700105 // Move the task to the stack so that its bound state is freed immediately
106 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700107 TaskWithMetadata task = std::move(running_task);
108 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700109 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700110 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700111}
112
113void TaskRunnerImpl::RunTasksUntilStopped() {
114 while (is_running_) {
115 ScheduleDelayedTasks();
116 RunCurrentTasksBlocking();
117 }
118}
119
120void TaskRunnerImpl::ScheduleDelayedTasks() {
121 std::lock_guard<std::mutex> lock(task_mutex_);
122
123 // Getting the time can be expensive on some platforms, so only get it once.
124 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700125 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
126 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
127 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700128 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700129 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700130}
131
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700132bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
133 if (!is_running_) {
134 return true;
135 }
136
137 if (!tasks_.empty()) {
138 return true;
139 }
140
141 return !delayed_tasks_.empty() &&
Yuri Wiitalab929b832019-06-05 17:13:15 -0700142 (delayed_tasks_.begin()->first <= now_function_());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700143}
144
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700145std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700146 // These checks are redundant, as they are a subset of predicates in the
147 // below wait predicate. However, this is more readable and a slight
148 // optimization, as we don't need to take a lock if we aren't running.
149 if (!is_running_) {
btolsch4051e722019-06-07 16:15:17 -0700150 OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700151 return {};
152 }
153
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700154 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700155 if (!tasks_.empty()) {
btolsch4051e722019-06-07 16:15:17 -0700156 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700157 return lock;
158 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700159
btolschd94fe622019-05-09 14:21:40 -0700160 if (task_waiter_) {
161 do {
162 Clock::duration timeout = waiter_timeout_;
163 if (!delayed_tasks_.empty()) {
164 Clock::duration next_task_delta =
Yuri Wiitalab929b832019-06-05 17:13:15 -0700165 delayed_tasks_.begin()->first - now_function_();
btolschd94fe622019-05-09 14:21:40 -0700166 if (next_task_delta < timeout) {
167 timeout = next_task_delta;
168 }
169 }
170 lock.unlock();
171 task_waiter_->WaitForTaskToBePosted(timeout);
172 lock.lock();
173 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700174 } else {
btolschd94fe622019-05-09 14:21:40 -0700175 // Pass a wait predicate to avoid lost or spurious wakeups.
btolschd94fe622019-05-09 14:21:40 -0700176 if (!delayed_tasks_.empty()) {
177 // We don't have any work to do currently, but have some in the
178 // pipe.
btolsch61045d12019-05-09 16:33:58 -0700179 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700180 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Yuri Wiitalab929b832019-06-05 17:13:15 -0700181 run_loop_wakeup_.wait_for(lock,
182 delayed_tasks_.begin()->first - now_function_(),
183 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700184 } else {
185 // We don't have any work queued.
btolsch61045d12019-05-09 16:33:58 -0700186 const auto wait_predicate = [this] {
187 return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
188 };
btolsch4051e722019-06-07 16:15:17 -0700189 OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
btolschd94fe622019-05-09 14:21:40 -0700190 run_loop_wakeup_.wait(lock, wait_predicate);
191 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700192 }
193
btolsch4051e722019-06-07 16:15:17 -0700194 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700195 return lock;
196}
197} // namespace platform
198} // namespace openscreen