Add TaskRunner implementation to Open Screen
This patch contains an initial TaskRunner for use with Open Screen.
This initial version does not contain a Chromium wrapper, nor does
it permit cancelling. A unit test suite is included for this new
class.
Change-Id: I2404980acf3b40b78d9772925ebe11ed460cc0b6
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1544773
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/api/impl/task_runner_impl.cc b/api/impl/task_runner_impl.cc
new file mode 100644
index 0000000..1aa5b33
--- /dev/null
+++ b/api/impl/task_runner_impl.cc
@@ -0,0 +1,119 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/impl/task_runner_impl.h"
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
+ : now_function_(now_function), is_running_(false) {}
+
+TaskRunnerImpl::~TaskRunnerImpl() {
+ RequestStopSoon();
+}
+
+void TaskRunnerImpl::PostTask(Task task) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ tasks_.push_back(std::move(task));
+ run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ delayed_tasks_.emplace(std::move(task), now_function_() + delay);
+ run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::RunUntilStopped() {
+ const bool was_running = is_running_.exchange(true);
+ OSP_CHECK(!was_running);
+
+ RunTasksUntilStopped();
+}
+
+void TaskRunnerImpl::RequestStopSoon() {
+ const bool was_running = is_running_.exchange(false);
+
+ if (was_running) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ run_loop_wakeup_.notify_one();
+ }
+}
+
+void TaskRunnerImpl::RunUntilIdleForTesting() {
+ ScheduleDelayedTasks();
+ RunCurrentTasksForTesting();
+}
+
+void TaskRunnerImpl::RunCurrentTasksForTesting() {
+ std::deque<Task> current_tasks;
+
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ tasks_.swap(current_tasks);
+ lock.unlock();
+
+ for (Task& task : current_tasks) {
+ task();
+ }
+}
+
+void TaskRunnerImpl::RunCurrentTasksBlocking() {
+ std::deque<Task> current_tasks;
+
+ auto lock = WaitForWorkAndAcquireLock();
+ tasks_.swap(current_tasks);
+ lock.unlock();
+
+ for (Task& task : current_tasks) {
+ task();
+ }
+}
+
+void TaskRunnerImpl::RunTasksUntilStopped() {
+ while (is_running_) {
+ ScheduleDelayedTasks();
+ RunCurrentTasksBlocking();
+ }
+}
+
+void TaskRunnerImpl::ScheduleDelayedTasks() {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+
+ // Getting the time can be expensive on some platforms, so only get it once.
+ const auto current_time = now_function_();
+ while (!delayed_tasks_.empty() &&
+ (delayed_tasks_.top().time_runnable_after < current_time)) {
+ tasks_.push_back(std::move(delayed_tasks_.top().task));
+ delayed_tasks_.pop();
+ }
+}
+
+std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+ std::unique_lock<std::mutex> lock(task_mutex_);
+
+ // Pass a wait predicate to avoid lost or spurious wakeups.
+ const auto wait_predicate = [this] {
+ // Either we got woken up because we aren't running
+ // (probably just to end the thread), or we are running and have tasks to
+ // do.
+ return !this->is_running_ || !this->tasks_.empty() ||
+ !this->delayed_tasks_.empty();
+ };
+
+ // We don't have any work to do currently, but know we have some in the pipe.
+ if (!delayed_tasks_.empty()) {
+ run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
+ wait_predicate);
+
+ // We don't have any work queued.
+ } else if (tasks_.empty()) {
+ run_loop_wakeup_.wait(lock, wait_predicate);
+ }
+
+ return lock;
+}
+} // namespace platform
+} // namespace openscreen