blob: 9bacacf4530b2db417f0d5ef0155e649f8c1d520 [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Jordan Baylesb0c191e2019-03-26 15:49:57 -07009#include "platform/api/logging.h"
10
11namespace openscreen {
12namespace platform {
btolschc92ba2f2019-04-10 11:46:01 -070013
btolschd94fe622019-05-09 14:21:40 -070014TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
15 TaskWaiter* event_waiter,
16 Clock::duration waiter_timeout)
17 : now_function_(now_function),
18 is_running_(false),
19 task_waiter_(event_waiter),
20 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070021
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070022TaskRunnerImpl::~TaskRunnerImpl() {
23 // Ensure no thread is currently executing inside RunUntilStopped().
24 OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
25}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070026
Yuri Wiitalab929b832019-06-05 17:13:15 -070027void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070028 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070029 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070030 if (task_waiter_) {
31 task_waiter_->OnTaskPosted();
32 } else {
33 run_loop_wakeup_.notify_one();
34 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070035}
36
Yuri Wiitalab929b832019-06-05 17:13:15 -070037void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
38 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070039 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalab929b832019-06-05 17:13:15 -070040 delayed_tasks_.emplace(
41 std::make_pair(now_function_() + delay, std::move(task)));
btolschd94fe622019-05-09 14:21:40 -070042 if (task_waiter_) {
43 task_waiter_->OnTaskPosted();
44 } else {
45 run_loop_wakeup_.notify_one();
46 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070047}
48
Max Yakimakhabf567dc2019-09-20 13:37:04 -070049bool TaskRunnerImpl::IsRunningOnTaskRunner() {
50 return task_runner_thread_id_ == std::this_thread::get_id();
51}
52
Jordan Baylesb0c191e2019-03-26 15:49:57 -070053void TaskRunnerImpl::RunUntilStopped() {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070054 OSP_DCHECK(!is_running_);
Max Yakimakhabf567dc2019-09-20 13:37:04 -070055 task_runner_thread_id_ = std::this_thread::get_id();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070056 is_running_ = true;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070057
Yuri Wiitalac05ada22019-09-24 14:25:19 -070058 // Main loop: Run until the |is_running_| flag is set back to false by the
59 // "quit task" posted by RequestStopSoon().
60 while (is_running_) {
61 ScheduleDelayedTasks();
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070062 if (GrabMoreRunnableTasks()) {
63 RunRunnableTasks();
64 }
Yuri Wiitalac05ada22019-09-24 14:25:19 -070065 }
66
67 // Flushing phase: Ensure all immediately-runnable tasks are run before
68 // returning. Since running some tasks might cause more immediately-runnable
69 // tasks to be posted, loop until there is no more work.
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070070 //
71 // If there is bad code that posts tasks indefinitely, this loop will never
72 // break. However, that also means there is a code path spinning a CPU core at
73 // 100% all the time. Rather than mitigate this problem scenario, purposely
74 // let it manifest here in the hopes that unit testing will reveal it (e.g., a
75 // unit test that never finishes running).
76 while (GrabMoreRunnableTasks()) {
77 RunRunnableTasks();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070078 }
79
80 task_runner_thread_id_ = std::thread::id();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070081}
82
83void TaskRunnerImpl::RequestStopSoon() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -070084 PostTask([this]() { is_running_ = false; });
Jordan Baylesb0c191e2019-03-26 15:49:57 -070085}
86
Yuri Wiitalac05ada22019-09-24 14:25:19 -070087void TaskRunnerImpl::RunRunnableTasks() {
Yuri Wiitalab929b832019-06-05 17:13:15 -070088 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -070089 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070090 // Move the task to the stack so that its bound state is freed immediately
91 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -070092 TaskWithMetadata task = std::move(running_task);
93 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070094 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070095 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070096}
97
Jordan Baylesb0c191e2019-03-26 15:49:57 -070098void TaskRunnerImpl::ScheduleDelayedTasks() {
99 std::lock_guard<std::mutex> lock(task_mutex_);
100
101 // Getting the time can be expensive on some platforms, so only get it once.
102 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700103 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
104 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
105 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700106 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700107 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700108}
109
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700110bool TaskRunnerImpl::GrabMoreRunnableTasks() {
111 OSP_DCHECK(running_tasks_.empty());
112
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700113 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700114 if (!tasks_.empty()) {
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700115 running_tasks_.swap(tasks_);
116 return true;
117 }
118
119 if (!is_running_) {
120 return false; // Stop was requested. Don't wait for more tasks.
Jordan Baylesa8e96772019-04-08 10:53:54 -0700121 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700122
btolschd94fe622019-05-09 14:21:40 -0700123 if (task_waiter_) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700124 Clock::duration timeout = waiter_timeout_;
btolschd94fe622019-05-09 14:21:40 -0700125 if (!delayed_tasks_.empty()) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700126 Clock::duration next_task_delta =
127 delayed_tasks_.begin()->first - now_function_();
128 if (next_task_delta < timeout) {
129 timeout = next_task_delta;
130 }
btolschd94fe622019-05-09 14:21:40 -0700131 }
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700132 lock.unlock();
133 task_waiter_->WaitForTaskToBePosted(timeout);
134 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700135 }
136
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700137 if (delayed_tasks_.empty()) {
138 run_loop_wakeup_.wait(lock);
139 } else {
140 run_loop_wakeup_.wait_for(lock,
141 delayed_tasks_.begin()->first - now_function_());
142 }
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700143 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700144}
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700145
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700146} // namespace platform
147} // namespace openscreen