blob: 89886dc7ac339ae1ce969db42f6361f02aaafcde [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070011
btolschd94fe622019-05-09 14:21:40 -070012TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
13 TaskWaiter* event_waiter,
14 Clock::duration waiter_timeout)
15 : now_function_(now_function),
16 is_running_(false),
17 task_waiter_(event_waiter),
18 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070019
Jordan Baylesa8e96772019-04-08 10:53:54 -070020TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
Yuri Wiitalab929b832019-06-05 17:13:15 -070022void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070023 std::lock_guard<std::mutex> lock(task_mutex_);
24 tasks_.push_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070025 if (task_waiter_) {
26 task_waiter_->OnTaskPosted();
27 } else {
28 run_loop_wakeup_.notify_one();
29 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070030}
31
Yuri Wiitalab929b832019-06-05 17:13:15 -070032void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
33 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070034 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070035 delayed_tasks_.emplace(
36 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070037 if (task_waiter_) {
38 task_waiter_->OnTaskPosted();
39 } else {
40 run_loop_wakeup_.notify_one();
41 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070042}
43
44void TaskRunnerImpl::RunUntilStopped() {
45 const bool was_running = is_running_.exchange(true);
46 OSP_CHECK(!was_running);
47
48 RunTasksUntilStopped();
49}
50
51void TaskRunnerImpl::RequestStopSoon() {
52 const bool was_running = is_running_.exchange(false);
53
54 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070055 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070056 if (task_waiter_) {
57 task_waiter_->OnTaskPosted();
58 } else {
59 std::lock_guard<std::mutex> lock(task_mutex_);
60 run_loop_wakeup_.notify_one();
61 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070062 }
63}
64
65void TaskRunnerImpl::RunUntilIdleForTesting() {
66 ScheduleDelayedTasks();
67 RunCurrentTasksForTesting();
68}
69
70void TaskRunnerImpl::RunCurrentTasksForTesting() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070071 {
72 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
73 // take the lock and drain the tasks_ queue. This allows tests to avoid
74 // having to do any multithreading to interact with the queue.
75 std::unique_lock<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070076 OSP_DCHECK(running_tasks_.empty());
77 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070078 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070079
Yuri Wiitalab929b832019-06-05 17:13:15 -070080 for (Task& task : running_tasks_) {
81 // Move the task to the stack so that its bound state is freed immediately
82 // after being run.
83 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070084 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070085 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070086}
87
88void TaskRunnerImpl::RunCurrentTasksBlocking() {
Jordan Baylesa8e96772019-04-08 10:53:54 -070089 {
90 // Wait for the lock. If there are no current tasks, we will wait until
91 // a delayed task is ready or a task gets added to the queue.
92 auto lock = WaitForWorkAndAcquireLock();
93 if (!lock) {
94 return;
95 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070096
Yuri Wiitalab929b832019-06-05 17:13:15 -070097 OSP_DCHECK(running_tasks_.empty());
98 running_tasks_.swap(tasks_);
Jordan Baylesa8e96772019-04-08 10:53:54 -070099 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700100
Yuri Wiitalab929b832019-06-05 17:13:15 -0700101 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
102 for (Task& task : running_tasks_) {
103 // Move the task to the stack so that its bound state is freed immediately
104 // after being run.
105 std::move(task)();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700106 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700107 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700108}
109
110void TaskRunnerImpl::RunTasksUntilStopped() {
111 while (is_running_) {
112 ScheduleDelayedTasks();
113 RunCurrentTasksBlocking();
114 }
115}
116
117void TaskRunnerImpl::ScheduleDelayedTasks() {
118 std::lock_guard<std::mutex> lock(task_mutex_);
119
120 // Getting the time can be expensive on some platforms, so only get it once.
121 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700122 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
123 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
124 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700125 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700126 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700127}
128
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700129bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
130 if (!is_running_) {
131 return true;
132 }
133
134 if (!tasks_.empty()) {
135 return true;
136 }
137
138 return !delayed_tasks_.empty() &&
Yuri Wiitalab929b832019-06-05 17:13:15 -0700139 (delayed_tasks_.begin()->first <= now_function_());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700140}
141
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700142std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700143 // These checks are redundant, as they are a subset of predicates in the
144 // below wait predicate. However, this is more readable and a slight
145 // optimization, as we don't need to take a lock if we aren't running.
146 if (!is_running_) {
btolsch4051e722019-06-07 16:15:17 -0700147 OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700148 return {};
149 }
150
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700151 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700152 if (!tasks_.empty()) {
btolsch4051e722019-06-07 16:15:17 -0700153 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700154 return lock;
155 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700156
btolschd94fe622019-05-09 14:21:40 -0700157 if (task_waiter_) {
158 do {
159 Clock::duration timeout = waiter_timeout_;
160 if (!delayed_tasks_.empty()) {
161 Clock::duration next_task_delta =
Yuri Wiitalab929b832019-06-05 17:13:15 -0700162 delayed_tasks_.begin()->first - now_function_();
btolschd94fe622019-05-09 14:21:40 -0700163 if (next_task_delta < timeout) {
164 timeout = next_task_delta;
165 }
166 }
167 lock.unlock();
168 task_waiter_->WaitForTaskToBePosted(timeout);
169 lock.lock();
170 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700171 } else {
btolschd94fe622019-05-09 14:21:40 -0700172 // Pass a wait predicate to avoid lost or spurious wakeups.
btolschd94fe622019-05-09 14:21:40 -0700173 if (!delayed_tasks_.empty()) {
174 // We don't have any work to do currently, but have some in the
175 // pipe.
btolsch61045d12019-05-09 16:33:58 -0700176 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700177 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Yuri Wiitalab929b832019-06-05 17:13:15 -0700178 run_loop_wakeup_.wait_for(lock,
179 delayed_tasks_.begin()->first - now_function_(),
180 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700181 } else {
182 // We don't have any work queued.
btolsch61045d12019-05-09 16:33:58 -0700183 const auto wait_predicate = [this] {
184 return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
185 };
btolsch4051e722019-06-07 16:15:17 -0700186 OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
btolschd94fe622019-05-09 14:21:40 -0700187 run_loop_wakeup_.wait(lock, wait_predicate);
188 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700189 }
190
btolsch4051e722019-06-07 16:15:17 -0700191 OSP_DVLOG << "TaskRunnerImpl lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700192 return lock;
193}
194} // namespace platform
195} // namespace openscreen