blob: ad061c2addec47dba3683ec0915752f7aac097a3 [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
5#include "api/impl/task_runner_impl.h"
6
7#include "platform/api/logging.h"
8
9namespace openscreen {
10namespace platform {
11TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
12 : now_function_(now_function), is_running_(false) {}
13
Jordan Baylesa8e96772019-04-08 10:53:54 -070014TaskRunnerImpl::~TaskRunnerImpl() = default;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070015
16void TaskRunnerImpl::PostTask(Task task) {
17 std::lock_guard<std::mutex> lock(task_mutex_);
18 tasks_.push_back(std::move(task));
19 run_loop_wakeup_.notify_one();
20}
21
22void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
23 std::lock_guard<std::mutex> lock(task_mutex_);
24 delayed_tasks_.emplace(std::move(task), now_function_() + delay);
25 run_loop_wakeup_.notify_one();
26}
27
28void TaskRunnerImpl::RunUntilStopped() {
29 const bool was_running = is_running_.exchange(true);
30 OSP_CHECK(!was_running);
31
32 RunTasksUntilStopped();
33}
34
35void TaskRunnerImpl::RequestStopSoon() {
36 const bool was_running = is_running_.exchange(false);
37
38 if (was_running) {
39 std::lock_guard<std::mutex> lock(task_mutex_);
40 run_loop_wakeup_.notify_one();
41 }
42}
43
44void TaskRunnerImpl::RunUntilIdleForTesting() {
45 ScheduleDelayedTasks();
46 RunCurrentTasksForTesting();
47}
48
49void TaskRunnerImpl::RunCurrentTasksForTesting() {
50 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070051 {
52 // Unlike in the RunCurrentTasksBlocking method, here we just immediately
53 // take the lock and drain the tasks_ queue. This allows tests to avoid
54 // having to do any multithreading to interact with the queue.
55 std::unique_lock<std::mutex> lock(task_mutex_);
56 tasks_.swap(current_tasks);
57 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070058
59 for (Task& task : current_tasks) {
60 task();
61 }
62}
63
64void TaskRunnerImpl::RunCurrentTasksBlocking() {
65 std::deque<Task> current_tasks;
Jordan Baylesa8e96772019-04-08 10:53:54 -070066 {
67 // Wait for the lock. If there are no current tasks, we will wait until
68 // a delayed task is ready or a task gets added to the queue.
69 auto lock = WaitForWorkAndAcquireLock();
70 if (!lock) {
71 return;
72 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070073
Jordan Baylesa8e96772019-04-08 10:53:54 -070074 tasks_.swap(current_tasks);
75 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070076
77 for (Task& task : current_tasks) {
78 task();
79 }
80}
81
82void TaskRunnerImpl::RunTasksUntilStopped() {
83 while (is_running_) {
84 ScheduleDelayedTasks();
85 RunCurrentTasksBlocking();
86 }
87}
88
89void TaskRunnerImpl::ScheduleDelayedTasks() {
90 std::lock_guard<std::mutex> lock(task_mutex_);
91
92 // Getting the time can be expensive on some platforms, so only get it once.
93 const auto current_time = now_function_();
94 while (!delayed_tasks_.empty() &&
Jordan Baylesa8e96772019-04-08 10:53:54 -070095 (delayed_tasks_.top().runnable_after < current_time)) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070096 tasks_.push_back(std::move(delayed_tasks_.top().task));
97 delayed_tasks_.pop();
98 }
99}
100
101std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700102 // These checks are redundant, as they are a subset of predicates in the
103 // below wait predicate. However, this is more readable and a slight
104 // optimization, as we don't need to take a lock if we aren't running.
105 if (!is_running_) {
106 return {};
107 }
108
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700109 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700110 if (!tasks_.empty()) {
111 return lock;
112 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700113
114 // Pass a wait predicate to avoid lost or spurious wakeups.
115 const auto wait_predicate = [this] {
116 // Either we got woken up because we aren't running
117 // (probably just to end the thread), or we are running and have tasks to
Jordan Baylesa8e96772019-04-08 10:53:54 -0700118 // do or schedule.
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700119 return !this->is_running_ || !this->tasks_.empty() ||
120 !this->delayed_tasks_.empty();
121 };
122
Jordan Baylesa8e96772019-04-08 10:53:54 -0700123 // TODO(jophba): find a predicate method that is compatible with our
124 // fake clock, for easier use with testing.
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700125 // We don't have any work to do currently, but know we have some in the pipe.
126 if (!delayed_tasks_.empty()) {
Jordan Baylesa8e96772019-04-08 10:53:54 -0700127 run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().runnable_after,
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700128 wait_predicate);
129
130 // We don't have any work queued.
131 } else if (tasks_.empty()) {
132 run_loop_wakeup_.wait(lock, wait_predicate);
133 }
134
135 return lock;
136}
137} // namespace platform
138} // namespace openscreen