blob: f306e79213032e82218a361c17db5fd6a587b176 [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
Jordan Baylesee059e92020-09-21 16:38:51 -07003// found in the LICENSE file.
Jordan Baylesb0c191e2019-03-26 15:49:57 -07004
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Yuri Wiitala341085a2020-03-11 17:13:54 -07007#include <csignal>
Ryan Keanea973b512019-07-29 15:50:39 -07008#include <thread>
9
Jordan Baylesa44d04a2020-05-06 16:59:39 -070010#include "util/osp_logging.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -070011
12namespace openscreen {
btolschc92ba2f2019-04-10 11:46:01 -070013
Yuri Wiitala341085a2020-03-11 17:13:54 -070014namespace {
15
16// This is mutated by the signal handler installed by RunUntilSignaled(), and is
17// checked by RunUntilStopped().
18//
19// Per the C++14 spec, passing visible changes to memory between a signal
20// handler and a program thread must be done through a volatile variable.
21volatile enum {
22 kNotRunning,
23 kNotSignaled,
24 kSignaled
25} g_signal_state = kNotRunning;
26
27void OnReceivedSignal(int signal) {
28 g_signal_state = kSignaled;
29}
30
31} // namespace
32
Yuri Wiitala2b02e322019-12-03 16:59:40 -080033TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
btolschd94fe622019-05-09 14:21:40 -070034 TaskWaiter* event_waiter,
35 Clock::duration waiter_timeout)
36 : now_function_(now_function),
37 is_running_(false),
38 task_waiter_(event_waiter),
39 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070040
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070041TaskRunnerImpl::~TaskRunnerImpl() {
42 // Ensure no thread is currently executing inside RunUntilStopped().
43 OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
44}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070045
Yuri Wiitalab929b832019-06-05 17:13:15 -070046void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070047 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070048 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070049 if (task_waiter_) {
50 task_waiter_->OnTaskPosted();
51 } else {
52 run_loop_wakeup_.notify_one();
53 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070054}
55
Yuri Wiitalab929b832019-06-05 17:13:15 -070056void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
57 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070058 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalac681b472020-02-20 13:41:54 -080059 if (delay <= Clock::duration::zero()) {
60 tasks_.emplace_back(std::move(task));
61 } else {
62 delayed_tasks_.emplace(
63 std::make_pair(now_function_() + delay, std::move(task)));
64 }
btolschd94fe622019-05-09 14:21:40 -070065 if (task_waiter_) {
66 task_waiter_->OnTaskPosted();
67 } else {
68 run_loop_wakeup_.notify_one();
69 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070070}
71
Max Yakimakhabf567dc2019-09-20 13:37:04 -070072bool TaskRunnerImpl::IsRunningOnTaskRunner() {
73 return task_runner_thread_id_ == std::this_thread::get_id();
74}
75
Jordan Baylesb0c191e2019-03-26 15:49:57 -070076void TaskRunnerImpl::RunUntilStopped() {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070077 OSP_DCHECK(!is_running_);
Max Yakimakhabf567dc2019-09-20 13:37:04 -070078 task_runner_thread_id_ = std::this_thread::get_id();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070079 is_running_ = true;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070080
Jordan Baylesee059e92020-09-21 16:38:51 -070081 OSP_DVLOG << "Running tasks until stopped...";
Yuri Wiitalac05ada22019-09-24 14:25:19 -070082 // Main loop: Run until the |is_running_| flag is set back to false by the
Yuri Wiitala341085a2020-03-11 17:13:54 -070083 // "quit task" posted by RequestStopSoon(), or the process received a
84 // termination signal.
Yuri Wiitalac05ada22019-09-24 14:25:19 -070085 while (is_running_) {
86 ScheduleDelayedTasks();
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070087 if (GrabMoreRunnableTasks()) {
88 RunRunnableTasks();
89 }
Yuri Wiitala341085a2020-03-11 17:13:54 -070090 if (g_signal_state == kSignaled) {
91 is_running_ = false;
92 }
Yuri Wiitalac05ada22019-09-24 14:25:19 -070093 }
94
Jordan Baylesee059e92020-09-21 16:38:51 -070095 OSP_DVLOG << "Finished running, entering flushing phase...";
Yuri Wiitalac05ada22019-09-24 14:25:19 -070096 // Flushing phase: Ensure all immediately-runnable tasks are run before
97 // returning. Since running some tasks might cause more immediately-runnable
98 // tasks to be posted, loop until there is no more work.
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070099 //
100 // If there is bad code that posts tasks indefinitely, this loop will never
101 // break. However, that also means there is a code path spinning a CPU core at
102 // 100% all the time. Rather than mitigate this problem scenario, purposely
103 // let it manifest here in the hopes that unit testing will reveal it (e.g., a
104 // unit test that never finishes running).
105 while (GrabMoreRunnableTasks()) {
106 RunRunnableTasks();
Yuri Wiitalac05ada22019-09-24 14:25:19 -0700107 }
Jordan Baylesee059e92020-09-21 16:38:51 -0700108 OSP_DVLOG << "Finished flushing...";
Yuri Wiitalac05ada22019-09-24 14:25:19 -0700109 task_runner_thread_id_ = std::thread::id();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700110}
111
Yuri Wiitala341085a2020-03-11 17:13:54 -0700112void TaskRunnerImpl::RunUntilSignaled() {
113 OSP_CHECK_EQ(g_signal_state, kNotRunning)
114 << __func__ << " may not be invoked concurrently.";
115 g_signal_state = kNotSignaled;
116 const auto old_sigint_handler = std::signal(SIGINT, &OnReceivedSignal);
117 const auto old_sigterm_handler = std::signal(SIGTERM, &OnReceivedSignal);
118
119 RunUntilStopped();
120
121 std::signal(SIGINT, old_sigint_handler);
122 std::signal(SIGTERM, old_sigterm_handler);
Jordan Baylesee059e92020-09-21 16:38:51 -0700123 OSP_DVLOG << "Received SIGNIT or SIGTERM, setting state to not running...";
Yuri Wiitala341085a2020-03-11 17:13:54 -0700124 g_signal_state = kNotRunning;
125}
126
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700127void TaskRunnerImpl::RequestStopSoon() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -0700128 PostTask([this]() { is_running_ = false; });
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700129}
130
Yuri Wiitalac05ada22019-09-24 14:25:19 -0700131void TaskRunnerImpl::RunRunnableTasks() {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700132 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700133 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -0700134 // Move the task to the stack so that its bound state is freed immediately
135 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -0700136 TaskWithMetadata task = std::move(running_task);
137 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700138 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700139 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700140}
141
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700142void TaskRunnerImpl::ScheduleDelayedTasks() {
143 std::lock_guard<std::mutex> lock(task_mutex_);
144
145 // Getting the time can be expensive on some platforms, so only get it once.
146 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700147 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
148 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
149 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700150 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700151 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700152}
153
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700154bool TaskRunnerImpl::GrabMoreRunnableTasks() {
155 OSP_DCHECK(running_tasks_.empty());
156
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700157 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700158 if (!tasks_.empty()) {
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700159 running_tasks_.swap(tasks_);
160 return true;
161 }
162
163 if (!is_running_) {
164 return false; // Stop was requested. Don't wait for more tasks.
Jordan Baylesa8e96772019-04-08 10:53:54 -0700165 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700166
btolschd94fe622019-05-09 14:21:40 -0700167 if (task_waiter_) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700168 Clock::duration timeout = waiter_timeout_;
btolschd94fe622019-05-09 14:21:40 -0700169 if (!delayed_tasks_.empty()) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700170 Clock::duration next_task_delta =
171 delayed_tasks_.begin()->first - now_function_();
172 if (next_task_delta < timeout) {
173 timeout = next_task_delta;
174 }
btolschd94fe622019-05-09 14:21:40 -0700175 }
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700176 lock.unlock();
177 task_waiter_->WaitForTaskToBePosted(timeout);
178 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700179 }
180
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700181 if (delayed_tasks_.empty()) {
182 run_loop_wakeup_.wait(lock);
183 } else {
184 run_loop_wakeup_.wait_for(lock,
185 delayed_tasks_.begin()->first - now_function_());
186 }
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700187 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700188}
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700189
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700190} // namespace openscreen