blob: d735f3c151d6809a18b1ac7282d4d3ea9c87486d [file] [log] [blame]
Jordan Baylesb0c191e2019-03-26 15:49:57 -07001// Copyright 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file
4
Jordan Baylesa26582d2019-07-10 14:44:58 -07005#include "platform/impl/task_runner.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -07006
Ryan Keanea973b512019-07-29 15:50:39 -07007#include <thread>
8
Yuri Wiitala0fda1ab2019-11-20 11:43:36 -08009#include "util/logging.h"
Jordan Baylesb0c191e2019-03-26 15:49:57 -070010
11namespace openscreen {
btolschc92ba2f2019-04-10 11:46:01 -070012
Yuri Wiitala2b02e322019-12-03 16:59:40 -080013TaskRunnerImpl::TaskRunnerImpl(ClockNowFunctionPtr now_function,
btolschd94fe622019-05-09 14:21:40 -070014 TaskWaiter* event_waiter,
15 Clock::duration waiter_timeout)
16 : now_function_(now_function),
17 is_running_(false),
18 task_waiter_(event_waiter),
19 waiter_timeout_(waiter_timeout) {}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070020
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070021TaskRunnerImpl::~TaskRunnerImpl() {
22 // Ensure no thread is currently executing inside RunUntilStopped().
23 OSP_DCHECK_EQ(task_runner_thread_id_, std::thread::id());
24}
Jordan Baylesb0c191e2019-03-26 15:49:57 -070025
Yuri Wiitalab929b832019-06-05 17:13:15 -070026void TaskRunnerImpl::PostPackagedTask(Task task) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070027 std::lock_guard<std::mutex> lock(task_mutex_);
Ryan Keaneefab2ed2019-07-22 12:36:53 -070028 tasks_.emplace_back(std::move(task));
btolschd94fe622019-05-09 14:21:40 -070029 if (task_waiter_) {
30 task_waiter_->OnTaskPosted();
31 } else {
32 run_loop_wakeup_.notify_one();
33 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070034}
35
Yuri Wiitalab929b832019-06-05 17:13:15 -070036void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
37 Clock::duration delay) {
Jordan Baylesb0c191e2019-03-26 15:49:57 -070038 std::lock_guard<std::mutex> lock(task_mutex_);
Yuri Wiitalac681b472020-02-20 13:41:54 -080039 if (delay <= Clock::duration::zero()) {
40 tasks_.emplace_back(std::move(task));
41 } else {
42 delayed_tasks_.emplace(
43 std::make_pair(now_function_() + delay, std::move(task)));
44 }
btolschd94fe622019-05-09 14:21:40 -070045 if (task_waiter_) {
46 task_waiter_->OnTaskPosted();
47 } else {
48 run_loop_wakeup_.notify_one();
49 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -070050}
51
Max Yakimakhabf567dc2019-09-20 13:37:04 -070052bool TaskRunnerImpl::IsRunningOnTaskRunner() {
53 return task_runner_thread_id_ == std::this_thread::get_id();
54}
55
Jordan Baylesb0c191e2019-03-26 15:49:57 -070056void TaskRunnerImpl::RunUntilStopped() {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -070057 OSP_DCHECK(!is_running_);
Max Yakimakhabf567dc2019-09-20 13:37:04 -070058 task_runner_thread_id_ = std::this_thread::get_id();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070059 is_running_ = true;
Jordan Baylesb0c191e2019-03-26 15:49:57 -070060
Yuri Wiitalac05ada22019-09-24 14:25:19 -070061 // Main loop: Run until the |is_running_| flag is set back to false by the
62 // "quit task" posted by RequestStopSoon().
63 while (is_running_) {
64 ScheduleDelayedTasks();
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070065 if (GrabMoreRunnableTasks()) {
66 RunRunnableTasks();
67 }
Yuri Wiitalac05ada22019-09-24 14:25:19 -070068 }
69
70 // Flushing phase: Ensure all immediately-runnable tasks are run before
71 // returning. Since running some tasks might cause more immediately-runnable
72 // tasks to be posted, loop until there is no more work.
Yuri Wiitala71a84bc2019-09-25 12:00:23 -070073 //
74 // If there is bad code that posts tasks indefinitely, this loop will never
75 // break. However, that also means there is a code path spinning a CPU core at
76 // 100% all the time. Rather than mitigate this problem scenario, purposely
77 // let it manifest here in the hopes that unit testing will reveal it (e.g., a
78 // unit test that never finishes running).
79 while (GrabMoreRunnableTasks()) {
80 RunRunnableTasks();
Yuri Wiitalac05ada22019-09-24 14:25:19 -070081 }
82
83 task_runner_thread_id_ = std::thread::id();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070084}
85
86void TaskRunnerImpl::RequestStopSoon() {
Yuri Wiitalac05ada22019-09-24 14:25:19 -070087 PostTask([this]() { is_running_ = false; });
Jordan Baylesb0c191e2019-03-26 15:49:57 -070088}
89
Yuri Wiitalac05ada22019-09-24 14:25:19 -070090void TaskRunnerImpl::RunRunnableTasks() {
Yuri Wiitalab929b832019-06-05 17:13:15 -070091 OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
Max Yakimakha371bc2b2019-09-04 10:49:17 -070092 for (TaskWithMetadata& running_task : running_tasks_) {
Yuri Wiitalab929b832019-06-05 17:13:15 -070093 // Move the task to the stack so that its bound state is freed immediately
94 // after being run.
Max Yakimakha371bc2b2019-09-04 10:49:17 -070095 TaskWithMetadata task = std::move(running_task);
96 task();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070097 }
Yuri Wiitalab929b832019-06-05 17:13:15 -070098 running_tasks_.clear();
Jordan Baylesb0c191e2019-03-26 15:49:57 -070099}
100
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700101void TaskRunnerImpl::ScheduleDelayedTasks() {
102 std::lock_guard<std::mutex> lock(task_mutex_);
103
104 // Getting the time can be expensive on some platforms, so only get it once.
105 const auto current_time = now_function_();
Yuri Wiitalab929b832019-06-05 17:13:15 -0700106 const auto end_of_range = delayed_tasks_.upper_bound(current_time);
107 for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
108 tasks_.push_back(std::move(it->second));
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700109 }
Yuri Wiitalab929b832019-06-05 17:13:15 -0700110 delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700111}
112
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700113bool TaskRunnerImpl::GrabMoreRunnableTasks() {
114 OSP_DCHECK(running_tasks_.empty());
115
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700116 std::unique_lock<std::mutex> lock(task_mutex_);
Jordan Baylesa8e96772019-04-08 10:53:54 -0700117 if (!tasks_.empty()) {
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700118 running_tasks_.swap(tasks_);
119 return true;
120 }
121
122 if (!is_running_) {
123 return false; // Stop was requested. Don't wait for more tasks.
Jordan Baylesa8e96772019-04-08 10:53:54 -0700124 }
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700125
btolschd94fe622019-05-09 14:21:40 -0700126 if (task_waiter_) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700127 Clock::duration timeout = waiter_timeout_;
btolschd94fe622019-05-09 14:21:40 -0700128 if (!delayed_tasks_.empty()) {
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700129 Clock::duration next_task_delta =
130 delayed_tasks_.begin()->first - now_function_();
131 if (next_task_delta < timeout) {
132 timeout = next_task_delta;
133 }
btolschd94fe622019-05-09 14:21:40 -0700134 }
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700135 lock.unlock();
136 task_waiter_->WaitForTaskToBePosted(timeout);
137 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700138 }
139
Yuri Wiitalaef8fc082019-09-25 17:21:23 -0700140 if (delayed_tasks_.empty()) {
141 run_loop_wakeup_.wait(lock);
142 } else {
143 run_loop_wakeup_.wait_for(lock,
144 delayed_tasks_.begin()->first - now_function_());
145 }
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700146 return false;
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700147}
Yuri Wiitala71a84bc2019-09-25 12:00:23 -0700148
Jordan Baylesb0c191e2019-03-26 15:49:57 -0700149} // namespace openscreen