blob: dae79c4cbf3695fa9e52dfd5d3e310206f93d21f [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Yuri Wiitala0fda1ab2019-11-20 11:43:36 -08009#include "util/logging.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -070010
11namespace openscreen {
btolschc92ba2f2019-04-10 11:46:01 -070012
Yuri Wiitala2b02e322019-12-03 16:59:40 -080013TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
btolschd94fe622019-05-09 14:21:40 -070014 TaskWaiter* event_waiter,
15 Clock::duration waiter_timeout)
16 : now_function_(now_function),
17 is_running_(false),
18 task_waiter_(event_waiter),
19 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070020
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070021TaskRunnerImpl::~TaskRunnerImpl() {
22 // Ensure no thread is currently executing inside RunUntilStopped().
23 OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
24}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070025
Yuri Wiitalab929b832019-06-05 17:13:15 -070026void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070027 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070028 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070029 if (task_waiter_) {
30 task_waiter_->OnTaskPosted();
31 } else {
32 run_loop_wakeup_.notify_one();
33 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070034}
35
Yuri Wiitalab929b832019-06-05 17:13:15 -070036void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
37 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070038 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070039 delayed_tasks_.emplace(
40 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070041 if (task_waiter_) {
42 task_waiter_->OnTaskPosted();
43 } else {
44 run_loop_wakeup_.notify_one();
45 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070046}
47
Max Yakimakhabf567dc2019-09-20 13:37:04 -070048bool TaskRunnerImpl::IsRunningOnTaskRunner() {
49 return task_runner_thread_id_ == std::this_thread::get_id();
50}
51
Jordan Baylesb0c191e2019-03-26 15:49:57 -070052void TaskRunnerImpl::RunUntilStopped() {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070053 OSP_DCHECK(!is_running_);
Max Yakimakhabf567dc2019-09-20 13:37:04 -070054 task_runner_thread_id_ = std::this_thread::get_id();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070055 is_running_ = true;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070056
Yuri Wiitalac05ada22019-09-24 14:25:19 -070057 // Main loop: Run until the |is_running_| flag is set back to false by the
58 // "quit task" posted by RequestStopSoon().
59 while (is_running_) {
60 ScheduleDelayedTasks();
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070061 if (GrabMoreRunnableTasks()) {
62 RunRunnableTasks();
63 }
Yuri Wiitalac05ada22019-09-24 14:25:19 -070064 }
65
66 // Flushing phase: Ensure all immediately-runnable tasks are run before
67 // returning. Since running some tasks might cause more immediately-runnable
68 // tasks to be posted, loop until there is no more work.
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070069 //
70 // If there is bad code that posts tasks indefinitely, this loop will never
71 // break. However, that also means there is a code path spinning a CPU core at
72 // 100% all the time. Rather than mitigate this problem scenario, purposely
73 // let it manifest here in the hopes that unit testing will reveal it (e.g., a
74 // unit test that never finishes running).
75 while (GrabMoreRunnableTasks()) {
76 RunRunnableTasks();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070077 }
78
79 task_runner_thread_id_ = std::thread::id();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070080}
81
82void TaskRunnerImpl::RequestStopSoon() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -070083 PostTask([this]() { is_running_ = false; });
Jordan Baylesb0c191e2019-03-26 15:49:57 -070084}
85
Yuri Wiitalac05ada22019-09-24 14:25:19 -070086void TaskRunnerImpl::RunRunnableTasks() {
Yuri Wiitalab929b832019-06-05 17:13:15 -070087 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -070088 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070089 // Move the task to the stack so that its bound state is freed immediately
90 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -070091 TaskWithMetadata task = std::move(running_task);
92 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070093 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070094 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070095}
96
Jordan Baylesb0c191e2019-03-26 15:49:57 -070097void TaskRunnerImpl::ScheduleDelayedTasks() {
98 std::lock_guard<std::mutex> lock(task_mutex_);
99
100 // Getting the time can be expensive on some platforms, so only get it once.
101 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700102 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
103 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
104 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700105 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700106 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700107}
108
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700109bool TaskRunnerImpl::GrabMoreRunnableTasks() {
110 OSP_DCHECK(running_tasks_.empty());
111
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700112 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700113 if (!tasks_.empty()) {
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700114 running_tasks_.swap(tasks_);
115 return true;
116 }
117
118 if (!is_running_) {
119 return false; // Stop was requested. Don't wait for more tasks.
Jordan Baylesa8e96772019-04-08 10:53:54 -0700120 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700121
btolschd94fe622019-05-09 14:21:40 -0700122 if (task_waiter_) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700123 Clock::duration timeout = waiter_timeout_;
btolschd94fe622019-05-09 14:21:40 -0700124 if (!delayed_tasks_.empty()) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700125 Clock::duration next_task_delta =
126 delayed_tasks_.begin()->first - now_function_();
127 if (next_task_delta < timeout) {
128 timeout = next_task_delta;
129 }
btolschd94fe622019-05-09 14:21:40 -0700130 }
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700131 lock.unlock();
132 task_waiter_->WaitForTaskToBePosted(timeout);
133 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700134 }
135
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700136 if (delayed_tasks_.empty()) {
137 run_loop_wakeup_.wait(lock);
138 } else {
139 run_loop_wakeup_.wait_for(lock,
140 delayed_tasks_.begin()->first - now_function_());
141 }
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700142 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700143}
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700144
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700145} // namespace openscreen