blob: 6c363158bab5563e6305a3b2bea2c7237dddc67f [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
btolschc92ba2f2019-04-10 11:46:01 -07005#include "platform/base/task_runner_impl.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070011
btolschd94fe622019-05-09 14:21:40 -070012TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
13 TaskWaiter* event_waiter,
14 Clock::duration waiter_timeout)
15 : now_function_(now_function),
16 is_running_(false),
17 task_waiter_(event_waiter),
18 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070019
Jordan Baylesa8e96772019-04-08 10:53:54 -070020TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
22void TaskRunnerImpl::PostTask(Task task) {
23 std::lock_guard<std::mutex> lock(task_mutex_);
24 tasks_.push_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070025 if (task_waiter_) {
26 task_waiter_->OnTaskPosted();
27 } else {
28 run_loop_wakeup_.notify_one();
29 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070030}
31
32void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
33 std::lock_guard<std::mutex> lock(task_mutex_);
34 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
btolschd94fe622019-05-09 14:21:40 -070035 if (task_waiter_) {
36 task_waiter_->OnTaskPosted();
37 } else {
38 run_loop_wakeup_.notify_one();
39 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040}
41
42void TaskRunnerImpl::RunUntilStopped() {
43 const bool was_running = is_running_.exchange(true);
44 OSP_CHECK(!was_running);
45
46 RunTasksUntilStopped();
47}
48
49void TaskRunnerImpl::RequestStopSoon() {
50 const bool was_running = is_running_.exchange(false);
51
52 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070053 OSP_DVLOG << "Requesting stop...";
btolschd94fe622019-05-09 14:21:40 -070054 if (task_waiter_) {
55 task_waiter_->OnTaskPosted();
56 } else {
57 std::lock_guard<std::mutex> lock(task_mutex_);
58 run_loop_wakeup_.notify_one();
59 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070060 }
61}
62
63void TaskRunnerImpl::RunUntilIdleForTesting() {
64 ScheduleDelayedTasks();
65 RunCurrentTasksForTesting();
66}
67
68void TaskRunnerImpl::RunCurrentTasksForTesting() {
69 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070070 {
71 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
72 // take the lock and drain the tasks_ queue. This allows tests to avoid
73 // having to do any multithreading to interact with the queue.
74 std::unique_lock<std::mutex> lock(task_mutex_);
75 tasks_.swap(current_tasks);
76 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070077
78 for (Task& task : current_tasks) {
79 task();
80 }
81}
82
83void TaskRunnerImpl::RunCurrentTasksBlocking() {
84 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070085 {
86 // Wait for the lock. If there are no current tasks, we will wait until
87 // a delayed task is ready or a task gets added to the queue.
88 auto lock = WaitForWorkAndAcquireLock();
89 if (!lock) {
90 return;
91 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070092
Jordan Baylesa8e96772019-04-08 10:53:54 -070093 tasks_.swap(current_tasks);
94 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070095
96 for (Task& task : current_tasks) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070097 OSP_DVLOG << "Running " << current_tasks.size() << " current tasks...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070098 task();
99 }
100}
101
102void TaskRunnerImpl::RunTasksUntilStopped() {
103 while (is_running_) {
104 ScheduleDelayedTasks();
105 RunCurrentTasksBlocking();
106 }
107}
108
109void TaskRunnerImpl::ScheduleDelayedTasks() {
110 std::lock_guard<std::mutex> lock(task_mutex_);
111
112 // Getting the time can be expensive on some platforms, so only get it once.
113 const auto current_time = now_function_();
114 while (!delayed_tasks_.empty() &&
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700115 (delayed_tasks_.top().runnable_after <= current_time)) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116 tasks_.push_back(std::move(delayed_tasks_.top().task));
117 delayed_tasks_.pop();
118 }
119}
120
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700121bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
122 if (!is_running_) {
123 return true;
124 }
125
126 if (!tasks_.empty()) {
127 return true;
128 }
129
130 return !delayed_tasks_.empty() &&
131 (delayed_tasks_.top().runnable_after <= now_function_());
132}
133
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700134std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700135 // These checks are redundant, as they are a subset of predicates in the
136 // below wait predicate. However, this is more readable and a slight
137 // optimization, as we don't need to take a lock if we aren't running.
138 if (!is_running_) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700139 OSP_DVLOG << "TaskRunner not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700140 return {};
141 }
142
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700143 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700144 if (!tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700145 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700146 return lock;
147 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700148
btolschd94fe622019-05-09 14:21:40 -0700149 if (task_waiter_) {
150 do {
151 Clock::duration timeout = waiter_timeout_;
152 if (!delayed_tasks_.empty()) {
153 Clock::duration next_task_delta =
154 delayed_tasks_.top().runnable_after - now_function_();
155 if (next_task_delta < timeout) {
156 timeout = next_task_delta;
157 }
158 }
159 lock.unlock();
160 task_waiter_->WaitForTaskToBePosted(timeout);
161 lock.lock();
162 } while (!ShouldWakeUpRunLoop());
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700163 } else {
btolschd94fe622019-05-09 14:21:40 -0700164 // Pass a wait predicate to avoid lost or spurious wakeups.
btolschd94fe622019-05-09 14:21:40 -0700165 if (!delayed_tasks_.empty()) {
166 // We don't have any work to do currently, but have some in the
167 // pipe.
btolsch61045d12019-05-09 16:33:58 -0700168 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
btolschd94fe622019-05-09 14:21:40 -0700169 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
btolsch61045d12019-05-09 16:33:58 -0700170 run_loop_wakeup_.wait_for(
171 lock, delayed_tasks_.top().runnable_after - now_function_(),
172 wait_predicate);
btolschd94fe622019-05-09 14:21:40 -0700173 } else {
174 // We don't have any work queued.
btolsch61045d12019-05-09 16:33:58 -0700175 const auto wait_predicate = [this] {
176 return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
177 };
btolschd94fe622019-05-09 14:21:40 -0700178 OSP_DVLOG << "TaskRunner waiting for lock...";
179 run_loop_wakeup_.wait(lock, wait_predicate);
180 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700181 }
182
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700183 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700184 return lock;
185}
186} // namespace platform
187} // namespace openscreen