blob: 1c2970bab6eb50fc08bd8cc8b776049792e6acef [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
btolschc92ba2f2019-04-10 11:46:01 -07005#include "platform/base/task_runner_impl.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070011
Jordan Baylesb0c191e2019-03-26 15:49:57 -070012TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
13 : now_function_(now_function), is_running_(false) {}
14
Jordan Baylesa8e96772019-04-08 10:53:54 -070015TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070016
17void TaskRunnerImpl::PostTask(Task task) {
18 std::lock_guard<std::mutex> lock(task_mutex_);
19 tasks_.push_back(std::move(task));
20 run_loop_wakeup_.notify_one();
21}
22
23void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
24 std::lock_guard<std::mutex> lock(task_mutex_);
25 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
26 run_loop_wakeup_.notify_one();
27}
28
29void TaskRunnerImpl::RunUntilStopped() {
30 const bool was_running = is_running_.exchange(true);
31 OSP_CHECK(!was_running);
32
33 RunTasksUntilStopped();
34}
35
36void TaskRunnerImpl::RequestStopSoon() {
37 const bool was_running = is_running_.exchange(false);
38
39 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070040 OSP_DVLOG << "Requesting stop...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070041 std::lock_guard<std::mutex> lock(task_mutex_);
42 run_loop_wakeup_.notify_one();
43 }
44}
45
46void TaskRunnerImpl::RunUntilIdleForTesting() {
47 ScheduleDelayedTasks();
48 RunCurrentTasksForTesting();
49}
50
51void TaskRunnerImpl::RunCurrentTasksForTesting() {
52 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070053 {
54 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
55 // take the lock and drain the tasks_ queue. This allows tests to avoid
56 // having to do any multithreading to interact with the queue.
57 std::unique_lock<std::mutex> lock(task_mutex_);
58 tasks_.swap(current_tasks);
59 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070060
61 for (Task& task : current_tasks) {
62 task();
63 }
64}
65
66void TaskRunnerImpl::RunCurrentTasksBlocking() {
67 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070068 {
69 // Wait for the lock. If there are no current tasks, we will wait until
70 // a delayed task is ready or a task gets added to the queue.
71 auto lock = WaitForWorkAndAcquireLock();
72 if (!lock) {
73 return;
74 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070075
Jordan Baylesa8e96772019-04-08 10:53:54 -070076 tasks_.swap(current_tasks);
77 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070078
79 for (Task& task : current_tasks) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070080 OSP_DVLOG << "Running " << current_tasks.size() << " current tasks...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070081 task();
82 }
83}
84
85void TaskRunnerImpl::RunTasksUntilStopped() {
86 while (is_running_) {
87 ScheduleDelayedTasks();
88 RunCurrentTasksBlocking();
89 }
90}
91
92void TaskRunnerImpl::ScheduleDelayedTasks() {
93 std::lock_guard<std::mutex> lock(task_mutex_);
94
95 // Getting the time can be expensive on some platforms, so only get it once.
96 const auto current_time = now_function_();
97 while (!delayed_tasks_.empty() &&
Jordan Bayles5d72bc22019-04-09 13:33:52 -070098 (delayed_tasks_.top().runnable_after <= current_time)) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070099 tasks_.push_back(std::move(delayed_tasks_.top().task));
100 delayed_tasks_.pop();
101 }
102}
103
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700104bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
105 if (!is_running_) {
106 return true;
107 }
108
109 if (!tasks_.empty()) {
110 return true;
111 }
112
113 return !delayed_tasks_.empty() &&
114 (delayed_tasks_.top().runnable_after <= now_function_());
115}
116
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700117std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700118 // These checks are redundant, as they are a subset of predicates in the
119 // below wait predicate. However, this is more readable and a slight
120 // optimization, as we don't need to take a lock if we aren't running.
121 if (!is_running_) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700122 OSP_DVLOG << "TaskRunner not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700123 return {};
124 }
125
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700126 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700127 if (!tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700128 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700129 return lock;
130 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700131
132 // Pass a wait predicate to avoid lost or spurious wakeups.
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700133 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700134 if (!delayed_tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700135 // We don't have any work to do currently, but have some in the
136 // pipe.
137 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700138 run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().runnable_after,
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700139 wait_predicate);
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700140 } else {
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700141 // We don't have any work queued.
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700142 OSP_DVLOG << "TaskRunner waiting for lock...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700143 run_loop_wakeup_.wait(lock, wait_predicate);
144 }
145
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700146 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700147 return lock;
148}
149} // namespace platform
150} // namespace openscreen