blob: a03842ea938363f00a8875c02be9a87f7123dceb [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Jordan Baylesb0c191e2019-03-26 15:49:57 -07009#include "platform/api/logging.h"
10
11namespace openscreen {
12namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070013
btolschd94fe622019-05-09 14:21:40 -070014TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
15 TaskWaiter* event_waiter,
16 Clock::duration waiter_timeout)
17 : now_function_(now_function),
18 is_running_(false),
19 task_waiter_(event_waiter),
20 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
Jordan Baylesa8e96772019-04-08 10:53:54 -070022TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070023
Yuri Wiitalab929b832019-06-05 17:13:15 -070024void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070025 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070026 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070027 if (task_waiter_) {
28 task_waiter_->OnTaskPosted();
29 } else {
30 run_loop_wakeup_.notify_one();
31 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070032}
33
Yuri Wiitalab929b832019-06-05 17:13:15 -070034void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
35 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070036 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070037 delayed_tasks_.emplace(
38 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070039 if (task_waiter_) {
40 task_waiter_->OnTaskPosted();
41 } else {
42 run_loop_wakeup_.notify_one();
43 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070044}
45
Max Yakimakhabf567dc2019-09-20 13:37:04 -070046bool TaskRunnerImpl::IsRunningOnTaskRunner() {
47 return task_runner_thread_id_ == std::this_thread::get_id();
48}
49
Jordan Baylesb0c191e2019-03-26 15:49:57 -070050void TaskRunnerImpl::RunUntilStopped() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -070051 OSP_CHECK(!is_running_);
Max Yakimakhabf567dc2019-09-20 13:37:04 -070052 task_runner_thread_id_ = std::this_thread::get_id();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070053 is_running_ = true;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070054
Yuri Wiitalac05ada22019-09-24 14:25:19 -070055 // Main loop: Run until the |is_running_| flag is set back to false by the
56 // "quit task" posted by RequestStopSoon().
57 while (is_running_) {
58 ScheduleDelayedTasks();
59 RunTasksAfterWaiting();
60 }
61
62 // Flushing phase: Ensure all immediately-runnable tasks are run before
63 // returning. Since running some tasks might cause more immediately-runnable
64 // tasks to be posted, loop until there is no more work.
65 {
66 std::unique_lock<std::mutex> lock(task_mutex_);
67 while (!tasks_.empty()) {
68 OSP_DCHECK(running_tasks_.empty());
69 running_tasks_.swap(tasks_);
70 lock.unlock();
71 RunRunnableTasks();
72 lock.lock();
73 }
74 }
75
76 task_runner_thread_id_ = std::thread::id();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070077}
78
79void TaskRunnerImpl::RequestStopSoon() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -070080 PostTask([this]() { is_running_ = false; });
Jordan Baylesb0c191e2019-03-26 15:49:57 -070081}
82
Yuri Wiitalac05ada22019-09-24 14:25:19 -070083void TaskRunnerImpl::RunTasksAfterWaiting() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070084 {
85 // Wait for the lock. If there are no current tasks, we will wait until
86 // a delayed task is ready or a task gets added to the queue.
87 auto lock = WaitForWorkAndAcquireLock();
88 if (!lock) {
89 return;
90 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070091
Yuri Wiitalab929b832019-06-05 17:13:15 -070092 OSP_DCHECK(running_tasks_.empty());
93 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070094 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070095
Yuri Wiitalac05ada22019-09-24 14:25:19 -070096 RunRunnableTasks();
97}
98
99void TaskRunnerImpl::RunRunnableTasks() {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700100 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700101 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700102 // Move the task to the stack so that its bound state is freed immediately
103 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700104 TaskWithMetadata task = std::move(running_task);
105 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700106 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700107 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700108}
109
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700110void TaskRunnerImpl::ScheduleDelayedTasks() {
111 std::lock_guard<std::mutex> lock(task_mutex_);
112
113 // Getting the time can be expensive on some platforms, so only get it once.
114 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700115 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
116 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
117 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700118 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700119 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700120}
121
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700122bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
123 if (!is_running_) {
124 return true;
125 }
126
127 if (!tasks_.empty()) {
128 return true;
129 }
130
131 return !delayed_tasks_.empty() &&
Yuri Wiitalab929b832019-06-05 17:13:15 -0700132 (delayed_tasks_.begin()->first <= now_function_());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700133}
134
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700135std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
136 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700137 if (!tasks_.empty()) {
btolsch4051e722019-06-07 16:15:17 -0700138 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700139 return lock;
140 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700141
btolschd94fe622019-05-09 14:21:40 -0700142 if (task_waiter_) {
143 do {
144 Clock::duration timeout = waiter_timeout_;
145 if (!delayed_tasks_.empty()) {
146 Clock::duration next_task_delta =
Yuri Wiitalab929b832019-06-05 17:13:15 -0700147 delayed_tasks_.begin()->first - now_function_();
btolschd94fe622019-05-09 14:21:40 -0700148 if (next_task_delta < timeout) {
149 timeout = next_task_delta;
150 }
151 }
152 lock.unlock();
153 task_waiter_->WaitForTaskToBePosted(timeout);
154 lock.lock();
155 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700156 } else {
btolschd94fe622019-05-09 14:21:40 -0700157 // Pass a wait predicate to avoid lost or spurious wakeups.
Yuri Wiitalac05ada22019-09-24 14:25:19 -0700158 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700159 if (!delayed_tasks_.empty()) {
160 // We don't have any work to do currently, but have some in the
161 // pipe.
162 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Yuri Wiitalab929b832019-06-05 17:13:15 -0700163 run_loop_wakeup_.wait_for(lock,
164 delayed_tasks_.begin()->first - now_function_(),
165 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700166 } else {
167 // We don't have any work queued.
btolsch4051e722019-06-07 16:15:17 -0700168 OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
btolschd94fe622019-05-09 14:21:40 -0700169 run_loop_wakeup_.wait(lock, wait_predicate);
170 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700171 }
172
btolsch4051e722019-06-07 16:15:17 -0700173 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700174 return lock;
175}
176} // namespace platform
177} // namespace openscreen