blob: 695fd2a567beefbb5db4c3e39ff429fde1d80e74 [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
5#include "api/impl/task_runner_impl.h"
6
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
11TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
12 : now_function_(now_function), is_running_(false) {}
13
Jordan Baylesa8e96772019-04-08 10:53:54 -070014TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070015
16void TaskRunnerImpl::PostTask(Task task) {
17 std::lock_guard<std::mutex> lock(task_mutex_);
18 tasks_.push_back(std::move(task));
19 run_loop_wakeup_.notify_one();
20}
21
22void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
23 std::lock_guard<std::mutex> lock(task_mutex_);
24 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
25 run_loop_wakeup_.notify_one();
26}
27
28void TaskRunnerImpl::RunUntilStopped() {
29 const bool was_running = is_running_.exchange(true);
30 OSP_CHECK(!was_running);
31
32 RunTasksUntilStopped();
33}
34
35void TaskRunnerImpl::RequestStopSoon() {
36 const bool was_running = is_running_.exchange(false);
37
38 if (was_running) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070039 OSP_DVLOG << "Requesting stop...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040 std::lock_guard<std::mutex> lock(task_mutex_);
41 run_loop_wakeup_.notify_one();
42 }
43}
44
45void TaskRunnerImpl::RunUntilIdleForTesting() {
46 ScheduleDelayedTasks();
47 RunCurrentTasksForTesting();
48}
49
50void TaskRunnerImpl::RunCurrentTasksForTesting() {
51 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070052 {
53 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
54 // take the lock and drain the tasks_ queue. This allows tests to avoid
55 // having to do any multithreading to interact with the queue.
56 std::unique_lock<std::mutex> lock(task_mutex_);
57 tasks_.swap(current_tasks);
58 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070059
60 for (Task& task : current_tasks) {
61 task();
62 }
63}
64
65void TaskRunnerImpl::RunCurrentTasksBlocking() {
66 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070067 {
68 // Wait for the lock. If there are no current tasks, we will wait until
69 // a delayed task is ready or a task gets added to the queue.
70 auto lock = WaitForWorkAndAcquireLock();
71 if (!lock) {
72 return;
73 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070074
Jordan Baylesa8e96772019-04-08 10:53:54 -070075 tasks_.swap(current_tasks);
76 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070077
78 for (Task& task : current_tasks) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -070079 OSP_DVLOG << "Running " << current_tasks.size() << " current tasks...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -070080 task();
81 }
82}
83
84void TaskRunnerImpl::RunTasksUntilStopped() {
85 while (is_running_) {
86 ScheduleDelayedTasks();
87 RunCurrentTasksBlocking();
88 }
89}
90
91void TaskRunnerImpl::ScheduleDelayedTasks() {
92 std::lock_guard<std::mutex> lock(task_mutex_);
93
94 // Getting the time can be expensive on some platforms, so only get it once.
95 const auto current_time = now_function_();
96 while (!delayed_tasks_.empty() &&
Jordan Bayles5d72bc22019-04-09 13:33:52 -070097 (delayed_tasks_.top().runnable_after <= current_time)) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070098 tasks_.push_back(std::move(delayed_tasks_.top().task));
99 delayed_tasks_.pop();
100 }
101}
102
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700103bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
104 if (!is_running_) {
105 return true;
106 }
107
108 if (!tasks_.empty()) {
109 return true;
110 }
111
112 return !delayed_tasks_.empty() &&
113 (delayed_tasks_.top().runnable_after <= now_function_());
114}
115
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700117 // These checks are redundant, as they are a subset of predicates in the
118 // below wait predicate. However, this is more readable and a slight
119 // optimization, as we don't need to take a lock if we aren't running.
120 if (!is_running_) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700121 OSP_DVLOG << "TaskRunner not running. Returning empty lock.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700122 return {};
123 }
124
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700125 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700126 if (!tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700127 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700128 return lock;
129 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700130
131 // Pass a wait predicate to avoid lost or spurious wakeups.
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700132 const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700133 if (!delayed_tasks_.empty()) {
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700134 // We don't have any work to do currently, but have some in the
135 // pipe.
136 OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
Jordan Baylesa8e96772019-04-08 10:53:54 -0700137 run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().runnable_after,
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700138 wait_predicate);
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700139 } else {
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700140 // We don't have any work queued.
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700141 OSP_DVLOG << "TaskRunner waiting for lock...";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700142 run_loop_wakeup_.wait(lock, wait_predicate);
143 }
144
Jordan Bayles5d72bc22019-04-09 13:33:52 -0700145 OSP_DVLOG << "TaskRunner lock acquired.";
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700146 return lock;
147}
148} // namespace platform
149} // namespace openscreen