blob: 22906d6f4344475989f4774e486f3959123b172f [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
btolschc92ba2f2019-04-10 11:46:01 -07005#include "platform/base/task_runner_impl.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070011
btolschd94fe622019-05-09 14:21:40 -070012TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
13 TaskWaiter* event_waiter,
14 Clock::duration waiter_timeout)
15 : now_function_(now_function),
16 is_running_(false),
17 task_waiter_(event_waiter),
18 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070019
Jordan Baylesa8e96772019-04-08 10:53:54 -070020TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
22void TaskRunnerImpl::PostTask(Task task) {
23 std::lock_guard<std::mutex> lock(task_mutex_);
24 tasks_.push_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070025 if (task_waiter_) {
26 task_waiter_->OnTaskPosted();
27 } else {
28 run_loop_wakeup_.notify_one();
29 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070030}
31
32void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
33 std::lock_guard<std::mutex> lock(task_mutex_);
34 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
btolschd94fe622019-05-09 14:21:40 -070035 if (task_waiter_) {
36 task_waiter_->OnTaskPosted();
37 } else {
38 run_loop_wakeup_.notify_one();
39 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040}
41
42void TaskRunnerImpl::RunUntilStopped() {
43 const bool was_running = is_running_.exchange(true);
44 OSP_CHECK(!was_running);
45
46 RunTasksUntilStopped();
47}
48
49void TaskRunnerImpl::RequestStopSoon() {
50 const bool was_running = is_running_.exchange(false);
51
52 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070053 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070054 if (task_waiter_) {
55 task_waiter_->OnTaskPosted();
56 } else {
57 std::lock_guard<std::mutex> lock(task_mutex_);
58 run_loop_wakeup_.notify_one();
59 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070060 }
61}
62
63void TaskRunnerImpl::RunUntilIdleForTesting() {
64 ScheduleDelayedTasks();
65 RunCurrentTasksForTesting();
66}
67
68void TaskRunnerImpl::RunCurrentTasksForTesting() {
69 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070070 {
71 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
72 // take the lock and drain the tasks_ queue. This allows tests to avoid
73 // having to do any multithreading to interact with the queue.
74 std::unique_lock<std::mutex> lock(task_mutex_);
75 tasks_.swap(current_tasks);
76 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070077
78 for (Task& task : current_tasks) {
79 task();
80 }
81}
82
83void TaskRunnerImpl::RunCurrentTasksBlocking() {
84 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070085 {
86 // Wait for the lock. If there are no current tasks, we will wait until
87 // a delayed task is ready or a task gets added to the queue.
88 auto lock = WaitForWorkAndAcquireLock();
89 if (!lock) {
90 return;
91 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070092
Jordan Baylesa8e96772019-04-08 10:53:54 -070093 tasks_.swap(current_tasks);
94 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070095
96 for (Task& task : current_tasks) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070097 OSP_DVLOG << "Running " << current_tasks.size() << " current tasks...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070098 task();
99 }
100}
101
102void TaskRunnerImpl::RunTasksUntilStopped() {
103 while (is_running_) {
104 ScheduleDelayedTasks();
105 RunCurrentTasksBlocking();
106 }
107}
108
109void TaskRunnerImpl::ScheduleDelayedTasks() {
110 std::lock_guard<std::mutex> lock(task_mutex_);
111
112 // Getting the time can be expensive on some platforms, so only get it once.
113 const auto current_time = now_function_();
114 while (!delayed_tasks_.empty() &&
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700115 (delayed_tasks_.top().runnable_after <= current_time)) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116 tasks_.push_back(std::move(delayed_tasks_.top().task));
117 delayed_tasks_.pop();
118 }
119}
120
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700121bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
122 if (!is_running_) {
123 return true;
124 }
125
126 if (!tasks_.empty()) {
127 return true;
128 }
129
130 return !delayed_tasks_.empty() &&
131 (delayed_tasks_.top().runnable_after <= now_function_());
132}
133
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700134std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700135 // These checks are redundant, as they are a subset of predicates in the
136 // below wait predicate. However, this is more readable and a slight
137 // optimization, as we don't need to take a lock if we aren't running.
138 if (!is_running_) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700139 OSP_DVLOG << "TaskRunner not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700140 return {};
141 }
142
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700143 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700144 if (!tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700145 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700146 return lock;
147 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700148
btolschd94fe622019-05-09 14:21:40 -0700149 if (task_waiter_) {
150 do {
151 Clock::duration timeout = waiter_timeout_;
152 if (!delayed_tasks_.empty()) {
153 Clock::duration next_task_delta =
154 delayed_tasks_.top().runnable_after - now_function_();
155 if (next_task_delta < timeout) {
156 timeout = next_task_delta;
157 }
158 }
159 lock.unlock();
160 task_waiter_->WaitForTaskToBePosted(timeout);
161 lock.lock();
162 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700163 } else {
btolschd94fe622019-05-09 14:21:40 -0700164 // Pass a wait predicate to avoid lost or spurious wakeups.
165 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
166 if (!delayed_tasks_.empty()) {
167 // We don't have any work to do currently, but have some in the
168 // pipe.
169 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
170 run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().runnable_after,
171 wait_predicate);
172 } else {
173 // We don't have any work queued.
174 OSP_DVLOG << "TaskRunner waiting for lock...";
175 run_loop_wakeup_.wait(lock, wait_predicate);
176 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700177 }
178
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700179 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700180 return lock;
181}
182} // namespace platform
183} // namespace openscreen