blob: 651e1fe01eff68cbe433c5ba57cda9c4eb4f58ff [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070011
Ryan Keaneefab2ed2019-07-22 12:36:53 -070012TaskRunnerImpl::TaskWithMetadata::TaskWithMetadata(Task task)
13 : task_(std::move(task)), trace_ids_(TRACE_HIERARCHY) {}
14
15void TaskRunnerImpl::TaskWithMetadata::operator()() {
16 TRACE_SET_HIERARCHY(trace_ids_);
17 std::move(task_)();
18}
19
btolschd94fe622019-05-09 14:21:40 -070020TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
21 TaskWaiter* event_waiter,
22 Clock::duration waiter_timeout)
23 : now_function_(now_function),
24 is_running_(false),
25 task_waiter_(event_waiter),
26 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070027
Jordan Baylesa8e96772019-04-08 10:53:54 -070028TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070029
Yuri Wiitalab929b832019-06-05 17:13:15 -070030void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070031 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070032 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070033 if (task_waiter_) {
34 task_waiter_->OnTaskPosted();
35 } else {
36 run_loop_wakeup_.notify_one();
37 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070038}
39
Yuri Wiitalab929b832019-06-05 17:13:15 -070040void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
41 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070042 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070043 delayed_tasks_.emplace(
44 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070045 if (task_waiter_) {
46 task_waiter_->OnTaskPosted();
47 } else {
48 run_loop_wakeup_.notify_one();
49 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070050}
51
52void TaskRunnerImpl::RunUntilStopped() {
53 const bool was_running = is_running_.exchange(true);
54 OSP_CHECK(!was_running);
55
56 RunTasksUntilStopped();
57}
58
59void TaskRunnerImpl::RequestStopSoon() {
60 const bool was_running = is_running_.exchange(false);
61
62 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070063 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070064 if (task_waiter_) {
65 task_waiter_->OnTaskPosted();
66 } else {
67 std::lock_guard<std::mutex> lock(task_mutex_);
68 run_loop_wakeup_.notify_one();
69 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070070 }
71}
72
73void TaskRunnerImpl::RunUntilIdleForTesting() {
74 ScheduleDelayedTasks();
75 RunCurrentTasksForTesting();
76}
77
78void TaskRunnerImpl::RunCurrentTasksForTesting() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070079 {
80 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
81 // take the lock and drain the tasks_ queue. This allows tests to avoid
82 // having to do any multithreading to interact with the queue.
83 std::unique_lock<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070084 OSP_DCHECK(running_tasks_.empty());
85 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070086 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070087
Ryan Keaneefab2ed2019-07-22 12:36:53 -070088 for (TaskWithMetadata& task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070089 // Move the task to the stack so that its bound state is freed immediately
90 // after being run.
91 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070092 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070093 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070094}
95
96void TaskRunnerImpl::RunCurrentTasksBlocking() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070097 {
98 // Wait for the lock. If there are no current tasks, we will wait until
99 // a delayed task is ready or a task gets added to the queue.
100 auto lock = WaitForWorkAndAcquireLock();
101 if (!lock) {
102 return;
103 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700104
Yuri Wiitalab929b832019-06-05 17:13:15 -0700105 OSP_DCHECK(running_tasks_.empty());
106 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700107 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700108
Yuri Wiitalab929b832019-06-05 17:13:15 -0700109 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Ryan Keaneefab2ed2019-07-22 12:36:53 -0700110 for (TaskWithMetadata& task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700111 // Move the task to the stack so that its bound state is freed immediately
112 // after being run.
113 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700114 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700115 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116}
117
118void TaskRunnerImpl::RunTasksUntilStopped() {
119 while (is_running_) {
120 ScheduleDelayedTasks();
121 RunCurrentTasksBlocking();
122 }
123}
124
125void TaskRunnerImpl::ScheduleDelayedTasks() {
126 std::lock_guard<std::mutex> lock(task_mutex_);
127
128 // Getting the time can be expensive on some platforms, so only get it once.
129 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700130 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
131 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
132 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700133 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700134 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700135}
136
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700137bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
138 if (!is_running_) {
139 return true;
140 }
141
142 if (!tasks_.empty()) {
143 return true;
144 }
145
146 return !delayed_tasks_.empty() &&
Yuri Wiitalab929b832019-06-05 17:13:15 -0700147 (delayed_tasks_.begin()->first <= now_function_());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700148}
149
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700150std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700151 // These checks are redundant, as they are a subset of predicates in the
152 // below wait predicate. However, this is more readable and a slight
153 // optimization, as we don't need to take a lock if we aren't running.
154 if (!is_running_) {
btolsch4051e722019-06-07 16:15:17 -0700155 OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700156 return {};
157 }
158
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700159 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700160 if (!tasks_.empty()) {
btolsch4051e722019-06-07 16:15:17 -0700161 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700162 return lock;
163 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700164
btolschd94fe622019-05-09 14:21:40 -0700165 if (task_waiter_) {
166 do {
167 Clock::duration timeout = waiter_timeout_;
168 if (!delayed_tasks_.empty()) {
169 Clock::duration next_task_delta =
Yuri Wiitalab929b832019-06-05 17:13:15 -0700170 delayed_tasks_.begin()->first - now_function_();
btolschd94fe622019-05-09 14:21:40 -0700171 if (next_task_delta < timeout) {
172 timeout = next_task_delta;
173 }
174 }
175 lock.unlock();
176 task_waiter_->WaitForTaskToBePosted(timeout);
177 lock.lock();
178 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700179 } else {
btolschd94fe622019-05-09 14:21:40 -0700180 // Pass a wait predicate to avoid lost or spurious wakeups.
btolschd94fe622019-05-09 14:21:40 -0700181 if (!delayed_tasks_.empty()) {
182 // We don't have any work to do currently, but have some in the
183 // pipe.
btolsch61045d12019-05-09 16:33:58 -0700184 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700185 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Yuri Wiitalab929b832019-06-05 17:13:15 -0700186 run_loop_wakeup_.wait_for(lock,
187 delayed_tasks_.begin()->first - now_function_(),
188 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700189 } else {
190 // We don't have any work queued.
btolsch61045d12019-05-09 16:33:58 -0700191 const auto wait_predicate = [this] {
192 return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
193 };
btolsch4051e722019-06-07 16:15:17 -0700194 OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
btolschd94fe622019-05-09 14:21:40 -0700195 run_loop_wakeup_.wait(lock, wait_predicate);
196 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700197 }
198
btolsch4051e722019-06-07 16:15:17 -0700199 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700200 return lock;
201}
202} // namespace platform
203} // namespace openscreen