blob: b9e6ad8ba65fa726eb3b5484e36f0bf167b2ed0f [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Jordan Baylesb0c191e2019-03-26 15:49:57 -07009#include "platform/api/logging.h"
10
11namespace openscreen {
12namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070013
Ryan Keaneefab2ed2019-07-22 12:36:53 -070014TaskRunnerImpl::TaskWithMetadata::TaskWithMetadata(Task task)
15 : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}
16
17void TaskRunnerImpl::TaskWithMetadata::operator()() {
18 TRACE_SET_HIERARCHY(trace_ids_);
19 std::move(task_)();
20}
21
btolschd94fe622019-05-09 14:21:40 -070022TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
23 TaskWaiter* event_waiter,
24 Clock::duration waiter_timeout)
25 : now_function_(now_function),
26 is_running_(false),
27 task_waiter_(event_waiter),
28 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070029
Jordan Baylesa8e96772019-04-08 10:53:54 -070030TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070031
Yuri Wiitalab929b832019-06-05 17:13:15 -070032void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070033 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070034 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070035 if (task_waiter_) {
36 task_waiter_->OnTaskPosted();
37 } else {
38 run_loop_wakeup_.notify_one();
39 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040}
41
Yuri Wiitalab929b832019-06-05 17:13:15 -070042void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
43 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070044 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070045 delayed_tasks_.emplace(
46 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070047 if (task_waiter_) {
48 task_waiter_->OnTaskPosted();
49 } else {
50 run_loop_wakeup_.notify_one();
51 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070052}
53
54void TaskRunnerImpl::RunUntilStopped() {
55 const bool was_running = is_running_.exchange(true);
56 OSP_CHECK(!was_running);
57
58 RunTasksUntilStopped();
59}
60
61void TaskRunnerImpl::RequestStopSoon() {
62 const bool was_running = is_running_.exchange(false);
63
64 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070065 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070066 if (task_waiter_) {
67 task_waiter_->OnTaskPosted();
68 } else {
69 std::lock_guard<std::mutex> lock(task_mutex_);
70 run_loop_wakeup_.notify_one();
71 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070072 }
73}
74
75void TaskRunnerImpl::RunUntilIdleForTesting() {
76 ScheduleDelayedTasks();
77 RunCurrentTasksForTesting();
78}
79
80void TaskRunnerImpl::RunCurrentTasksForTesting() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070081 {
82 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
83 // take the lock and drain the tasks_ queue. This allows tests to avoid
84 // having to do any multithreading to interact with the queue.
85 std::unique_lock<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070086 OSP_DCHECK(running_tasks_.empty());
87 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070088 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070089
Ryan Keaneefab2ed2019-07-22 12:36:53 -070090 for (TaskWithMetadata& task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070091 // Move the task to the stack so that its bound state is freed immediately
92 // after being run.
93 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070094 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070095 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070096}
97
98void TaskRunnerImpl::RunCurrentTasksBlocking() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070099 {
100 // Wait for the lock. If there are no current tasks, we will wait until
101 // a delayed task is ready or a task gets added to the queue.
102 auto lock = WaitForWorkAndAcquireLock();
103 if (!lock) {
104 return;
105 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700106
Yuri Wiitalab929b832019-06-05 17:13:15 -0700107 OSP_DCHECK(running_tasks_.empty());
108 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700109 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700110
Yuri Wiitalab929b832019-06-05 17:13:15 -0700111 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Ryan Keaneefab2ed2019-07-22 12:36:53 -0700112 for (TaskWithMetadata& task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700113 // Move the task to the stack so that its bound state is freed immediately
114 // after being run.
115 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700117 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700118}
119
120void TaskRunnerImpl::RunTasksUntilStopped() {
121 while (is_running_) {
122 ScheduleDelayedTasks();
123 RunCurrentTasksBlocking();
124 }
125}
126
127void TaskRunnerImpl::ScheduleDelayedTasks() {
128 std::lock_guard<std::mutex> lock(task_mutex_);
129
130 // Getting the time can be expensive on some platforms, so only get it once.
131 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700132 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
133 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
134 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700135 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700136 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700137}
138
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700139bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
140 if (!is_running_) {
141 return true;
142 }
143
144 if (!tasks_.empty()) {
145 return true;
146 }
147
148 return !delayed_tasks_.empty() &&
Yuri Wiitalab929b832019-06-05 17:13:15 -0700149 (delayed_tasks_.begin()->first <= now_function_());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700150}
151
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700152std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700153 // These checks are redundant, as they are a subset of predicates in the
154 // below wait predicate. However, this is more readable and a slight
155 // optimization, as we don't need to take a lock if we aren't running.
156 if (!is_running_) {
btolsch4051e722019-06-07 16:15:17 -0700157 OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700158 return {};
159 }
160
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700161 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700162 if (!tasks_.empty()) {
btolsch4051e722019-06-07 16:15:17 -0700163 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700164 return lock;
165 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700166
btolschd94fe622019-05-09 14:21:40 -0700167 if (task_waiter_) {
168 do {
169 Clock::duration timeout = waiter_timeout_;
170 if (!delayed_tasks_.empty()) {
171 Clock::duration next_task_delta =
Yuri Wiitalab929b832019-06-05 17:13:15 -0700172 delayed_tasks_.begin()->first - now_function_();
btolschd94fe622019-05-09 14:21:40 -0700173 if (next_task_delta < timeout) {
174 timeout = next_task_delta;
175 }
176 }
177 lock.unlock();
178 task_waiter_->WaitForTaskToBePosted(timeout);
179 lock.lock();
180 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700181 } else {
btolschd94fe622019-05-09 14:21:40 -0700182 // Pass a wait predicate to avoid lost or spurious wakeups.
btolschd94fe622019-05-09 14:21:40 -0700183 if (!delayed_tasks_.empty()) {
184 // We don't have any work to do currently, but have some in the
185 // pipe.
btolsch61045d12019-05-09 16:33:58 -0700186 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700187 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Yuri Wiitalab929b832019-06-05 17:13:15 -0700188 run_loop_wakeup_.wait_for(lock,
189 delayed_tasks_.begin()->first - now_function_(),
190 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700191 } else {
192 // We don't have any work queued.
btolsch61045d12019-05-09 16:33:58 -0700193 const auto wait_predicate = [this] {
194 return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
195 };
btolsch4051e722019-06-07 16:15:17 -0700196 OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
btolschd94fe622019-05-09 14:21:40 -0700197 run_loop_wakeup_.wait(lock, wait_predicate);
198 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700199 }
200
btolsch4051e722019-06-07 16:15:17 -0700201 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700202 return lock;
203}
204} // namespace platform
205} // namespace openscreen