Add TaskRunner implementation to Open Screen
This patch contains an initial TaskRunner for use with Open Screen.
This initial version does not contain a Chromium wrapper, nor does
it permit cancelling. A unit test suite is included for this new
class.
Change-Id: I2404980acf3b40b78d9772925ebe11ed460cc0b6
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1544773
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/api/BUILD.gn b/api/BUILD.gn
index d9dbf0b..cbe9f08 100644
--- a/api/BUILD.gn
+++ b/api/BUILD.gn
@@ -37,6 +37,7 @@
"impl/receiver_list_unittest.cc",
"impl/service_listener_impl_unittest.cc",
"impl/service_publisher_impl_unittest.cc",
+ "impl/task_runner_unittest.cc",
"public/endpoint_request_ids_unittest.cc",
"public/message_demuxer_unittest.cc",
"public/service_info_unittest.cc",
diff --git a/api/impl/BUILD.gn b/api/impl/BUILD.gn
index 4ba40a5..910932c 100644
--- a/api/impl/BUILD.gn
+++ b/api/impl/BUILD.gn
@@ -22,6 +22,9 @@
"service_listener_impl.h",
"service_publisher_impl.cc",
"service_publisher_impl.h",
+ "task_runner_factory.cc",
+ "task_runner_impl.cc",
+ "task_runner_impl.h",
]
if (use_mdns_responder) {
diff --git a/api/impl/task_runner_factory.cc b/api/impl/task_runner_factory.cc
new file mode 100644
index 0000000..fa724d0
--- /dev/null
+++ b/api/impl/task_runner_factory.cc
@@ -0,0 +1,19 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/public/task_runner_factory.h"
+
+#include "api/impl/task_runner_impl.h"
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+
+// static
+std::unique_ptr<TaskRunner> TaskRunnerFactory::Create() {
+ return std::unique_ptr<TaskRunner>(new TaskRunnerImpl(platform::Clock::now));
+}
+
+} // namespace platform
+} // namespace openscreen
diff --git a/api/impl/task_runner_impl.cc b/api/impl/task_runner_impl.cc
new file mode 100644
index 0000000..1aa5b33
--- /dev/null
+++ b/api/impl/task_runner_impl.cc
@@ -0,0 +1,119 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/impl/task_runner_impl.h"
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
+ : now_function_(now_function), is_running_(false) {}
+
+TaskRunnerImpl::~TaskRunnerImpl() {
+ RequestStopSoon();
+}
+
+void TaskRunnerImpl::PostTask(Task task) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ tasks_.push_back(std::move(task));
+ run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ delayed_tasks_.emplace(std::move(task), now_function_() + delay);
+ run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::RunUntilStopped() {
+ const bool was_running = is_running_.exchange(true);
+ OSP_CHECK(!was_running);
+
+ RunTasksUntilStopped();
+}
+
+void TaskRunnerImpl::RequestStopSoon() {
+ const bool was_running = is_running_.exchange(false);
+
+ if (was_running) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ run_loop_wakeup_.notify_one();
+ }
+}
+
+void TaskRunnerImpl::RunUntilIdleForTesting() {
+ ScheduleDelayedTasks();
+ RunCurrentTasksForTesting();
+}
+
+void TaskRunnerImpl::RunCurrentTasksForTesting() {
+ std::deque<Task> current_tasks;
+
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ tasks_.swap(current_tasks);
+ lock.unlock();
+
+ for (Task& task : current_tasks) {
+ task();
+ }
+}
+
+void TaskRunnerImpl::RunCurrentTasksBlocking() {
+ std::deque<Task> current_tasks;
+
+ auto lock = WaitForWorkAndAcquireLock();
+ tasks_.swap(current_tasks);
+ lock.unlock();
+
+ for (Task& task : current_tasks) {
+ task();
+ }
+}
+
+void TaskRunnerImpl::RunTasksUntilStopped() {
+ while (is_running_) {
+ ScheduleDelayedTasks();
+ RunCurrentTasksBlocking();
+ }
+}
+
+void TaskRunnerImpl::ScheduleDelayedTasks() {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+
+ // Getting the time can be expensive on some platforms, so only get it once.
+ const auto current_time = now_function_();
+ while (!delayed_tasks_.empty() &&
+ (delayed_tasks_.top().time_runnable_after < current_time)) {
+ tasks_.push_back(std::move(delayed_tasks_.top().task));
+ delayed_tasks_.pop();
+ }
+}
+
+std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+ std::unique_lock<std::mutex> lock(task_mutex_);
+
+ // Pass a wait predicate to avoid lost or spurious wakeups.
+ const auto wait_predicate = [this] {
+ // Either we got woken up because we aren't running
+ // (probably just to end the thread), or we are running and have tasks to
+ // do.
+ return !this->is_running_ || !this->tasks_.empty() ||
+ !this->delayed_tasks_.empty();
+ };
+
+ // We don't have any work to do currently, but know we have some in the pipe.
+ if (!delayed_tasks_.empty()) {
+ run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
+ wait_predicate);
+
+ // We don't have any work queued.
+ } else if (tasks_.empty()) {
+ run_loop_wakeup_.wait(lock, wait_predicate);
+ }
+
+ return lock;
+}
+} // namespace platform
+} // namespace openscreen
diff --git a/api/impl/task_runner_impl.h b/api/impl/task_runner_impl.h
new file mode 100644
index 0000000..ab86575
--- /dev/null
+++ b/api/impl/task_runner_impl.h
@@ -0,0 +1,97 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_IMPL_TASK_RUNNER_IMPL_H_
+#define API_IMPL_TASK_RUNNER_IMPL_H_
+
+#include <atomic>
+#include <condition_variable> // NOLINT
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex> // NOLINT
+#include <queue>
+#include <thread> // NOLINT
+#include <utility>
+#include <vector>
+
+#include "absl/base/thread_annotations.h"
+#include "absl/types/optional.h"
+#include "api/public/task_runner.h"
+#include "platform/api/time.h"
+
+namespace openscreen {
+namespace platform {
+
+class TaskRunnerImpl : public TaskRunner {
+ public:
+ explicit TaskRunnerImpl(platform::ClockNowFunctionPtr now_function);
+
+ // TaskRunner overrides
+ ~TaskRunnerImpl() override;
+ void PostTask(Task task) override;
+ void PostTaskWithDelay(Task task, Clock::duration delay) override;
+
+ // Tasks will only be executed if RunUntilStopped has been called, and Stop
+ // has not. Important note: TaskRunnerImpl does NOT do any threading, so
+ // calling "RunUntilStopped()" will block whatever thread you are calling it
+ // on.
+ void RunUntilStopped();
+ void RequestStopSoon();
+
+ // Execute all tasks immediately, useful for testing only. Note: this method
+ // will schedule any delayed tasks that are ready to run, but does not block
+ // waiting for delayed tasks to become eligible.
+ void RunUntilIdleForTesting();
+
+ private:
+ struct DelayedTask {
+ DelayedTask(Task task_, Clock::time_point time_runnable_after_)
+ : task(std::move(task_)), time_runnable_after(time_runnable_after_) {}
+
+ Task task;
+ Clock::time_point time_runnable_after;
+
+ bool operator>(const DelayedTask& other) const {
+ return this->time_runnable_after > other.time_runnable_after;
+ }
+ };
+
+ // Run all tasks already in the task queue.
+ void RunCurrentTasksBlocking();
+ void RunCurrentTasksForTesting();
+
+ // Loop method that runs tasks in the current thread, until the Stop
+ // method is called.
+ void RunTasksUntilStopped();
+
+ // Look at all tasks in the delayed task queue, then schedule them if the
+ // minimum delay time has elapsed.
+ void ScheduleDelayedTasks();
+
+ // Puts the task running thread into a waiting state for a notify on the
+ // run loop wakeup condition variable, and returns the acquired lock.
+ std::unique_lock<std::mutex> WaitForWorkAndAcquireLock();
+
+ const platform::ClockNowFunctionPtr now_function_;
+
+ // Atomic so that we can perform atomic exchanges.
+ std::atomic_bool is_running_;
+
+ // This mutex is used for both tasks_ and notifying the run loop to wake
+ // up when it is waiting for a task to be added to the queue in
+ // run_loop_wakeup_.
+ std::mutex task_mutex_;
+ std::priority_queue<DelayedTask,
+ std::vector<DelayedTask>,
+ std::greater<DelayedTask>>
+ delayed_tasks_ GUARDED_BY(task_mutex_);
+
+ std::condition_variable run_loop_wakeup_;
+ std::deque<Task> tasks_ GUARDED_BY(task_mutex_);
+};
+} // namespace platform
+} // namespace openscreen
+
+#endif // API_IMPL_TASK_RUNNER_IMPL_H_
diff --git a/api/impl/task_runner_unittest.cc b/api/impl/task_runner_unittest.cc
new file mode 100644
index 0000000..ab6d46e
--- /dev/null
+++ b/api/impl/task_runner_unittest.cc
@@ -0,0 +1,166 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <thread> // NOLINT
+
+#include "api/impl/task_runner_impl.h"
+#include "api/public/task_runner_factory.h"
+#include "platform/api/time.h"
+#include "third_party/googletest/src/googletest/include/gtest/gtest.h"
+
+namespace {
+using openscreen::platform::Clock;
+using std::chrono::milliseconds;
+
+const auto kTaskRunnerSleepTime = milliseconds(1);
+
+void WaitUntilCondition(std::function<bool()> predicate) {
+ while (!predicate()) {
+ std::this_thread::sleep_for(kTaskRunnerSleepTime);
+ }
+}
+} // anonymous namespace
+
+namespace openscreen {
+namespace platform {
+
+TEST(TaskRunnerTest, TaskRunnerFromFactoryExecutesTask) {
+ auto runner = TaskRunnerFactory::Create();
+
+ std::thread t([&runner] {
+ static_cast<TaskRunnerImpl*>(runner.get())->RunUntilStopped();
+ });
+
+ std::string ran_tasks = "";
+ const auto task = [&ran_tasks] { ran_tasks += "1"; };
+ EXPECT_EQ(ran_tasks, "");
+
+ runner->PostTask(task);
+
+ WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
+ EXPECT_EQ(ran_tasks, "1");
+
+ static_cast<TaskRunnerImpl*>(runner.get())->RequestStopSoon();
+ t.join();
+}
+
+TEST(TaskRunnerTest, TaskRunnerRunsDelayedTasksInOrder) {
+ auto runner = TaskRunnerFactory::Create();
+
+ std::thread t([&runner] {
+ static_cast<TaskRunnerImpl*>(runner.get())->RunUntilStopped();
+ });
+
+ std::string ran_tasks = "";
+
+ const auto kDelayTimeTaskOne = milliseconds(5);
+ const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+ runner->PostTaskWithDelay(task_one, kDelayTimeTaskOne);
+
+ const auto kDelayTimeTaskTwo = milliseconds(10);
+ const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+ runner->PostTaskWithDelay(task_two, kDelayTimeTaskTwo);
+
+ const auto now = platform::Clock::now();
+ const auto kMinimumClockFirstTaskShouldRunAt = now + kDelayTimeTaskOne;
+ const auto kMinimumClockSecondTaskShouldRunAt = now + kDelayTimeTaskTwo;
+ while (platform::Clock::now() < kMinimumClockFirstTaskShouldRunAt) {
+ EXPECT_EQ(ran_tasks, "");
+ std::this_thread::sleep_for(kTaskRunnerSleepTime);
+ }
+
+ WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
+ EXPECT_EQ(ran_tasks, "1");
+
+ while (platform::Clock::now() < kMinimumClockSecondTaskShouldRunAt) {
+ EXPECT_EQ(ran_tasks, "1");
+ std::this_thread::sleep_for(kTaskRunnerSleepTime);
+ }
+
+ WaitUntilCondition([&ran_tasks] { return ran_tasks == "12"; });
+ EXPECT_EQ(ran_tasks, "12");
+
+ static_cast<TaskRunnerImpl*>(runner.get())->RequestStopSoon();
+ t.join();
+}
+
+TEST(TaskRunnerTest, SingleThreadedTaskRunnerRunsSequentially) {
+ TaskRunnerImpl runner(platform::Clock::now);
+
+ std::string ran_tasks;
+ const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+ const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+ const auto task_three = [&ran_tasks] { ran_tasks += "3"; };
+ const auto task_four = [&ran_tasks] { ran_tasks += "4"; };
+ const auto task_five = [&ran_tasks] { ran_tasks += "5"; };
+
+ runner.PostTask(task_one);
+ runner.PostTask(task_two);
+ runner.PostTask(task_three);
+ runner.PostTask(task_four);
+ runner.PostTask(task_five);
+ EXPECT_EQ(ran_tasks, "");
+
+ runner.RunUntilIdleForTesting();
+ EXPECT_EQ(ran_tasks, "12345");
+}
+
+TEST(TaskRunnerTest, TaskRunnerCanStopRunning) {
+ TaskRunnerImpl runner(platform::Clock::now);
+
+ std::string ran_tasks;
+ const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+ const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+
+ runner.PostTask(task_one);
+ EXPECT_EQ(ran_tasks, "");
+
+ std::thread start_thread([&runner] { runner.RunUntilStopped(); });
+
+ WaitUntilCondition([&ran_tasks] { return !ran_tasks.empty(); });
+ EXPECT_EQ(ran_tasks, "1");
+
+ // Since Stop is called first, and the single threaded task
+ // runner should honor the queue, we know the task runner is not running
+ // since task two doesn't get ran.
+ runner.RequestStopSoon();
+ runner.PostTask(task_two);
+ EXPECT_EQ(ran_tasks, "1");
+
+ start_thread.join();
+}
+
+TEST(TaskRunnerTest, StoppingDoesNotDeleteTasks) {
+ TaskRunnerImpl runner(platform::Clock::now);
+
+ std::string ran_tasks;
+ const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+
+ runner.PostTask(task_one);
+ runner.RequestStopSoon();
+
+ EXPECT_EQ(ran_tasks, "");
+ runner.RunUntilIdleForTesting();
+
+ EXPECT_EQ(ran_tasks, "1");
+}
+
+TEST(TaskRunnerTest, TaskRunnerIsStableWithLotsOfTasks) {
+ TaskRunnerImpl runner(platform::Clock::now);
+
+ const int kNumberOfTasks = 500;
+ std::string expected_ran_tasks;
+ expected_ran_tasks.append(kNumberOfTasks, '1');
+
+ std::string ran_tasks;
+ for (int i = 0; i < kNumberOfTasks; ++i) {
+ const auto task = [&ran_tasks] { ran_tasks += "1"; };
+ runner.PostTask(task);
+ }
+
+ runner.RunUntilIdleForTesting();
+ EXPECT_EQ(ran_tasks, expected_ran_tasks);
+}
+} // namespace platform
+} // namespace openscreen
diff --git a/api/public/BUILD.gn b/api/public/BUILD.gn
index a6894a4..28fec9f 100644
--- a/api/public/BUILD.gn
+++ b/api/public/BUILD.gn
@@ -33,6 +33,8 @@
"service_listener.h",
"service_publisher.cc",
"service_publisher.h",
+ "task_runner.h",
+ "task_runner_factory.h",
]
public_deps = [
diff --git a/api/public/task_runner.h b/api/public/task_runner.h
new file mode 100644
index 0000000..2921032
--- /dev/null
+++ b/api/public/task_runner.h
@@ -0,0 +1,45 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_PUBLIC_TASK_RUNNER_H_
+#define API_PUBLIC_TASK_RUNNER_H_
+
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
+#include <utility>
+
+#include "platform/api/time.h"
+
+namespace openscreen {
+namespace platform {
+
+// A thread-safe API surface that allows for posting tasks. The underlying
+// implementation may be single or multi-threaded, and all complication should
+// be handled by either the implementation class or the TaskRunnerFactory
+// method. It is the expectation of this API that the underlying impl gives
+// the following guarantees:
+// (1) Tasks shall not overlap in time/CPU.
+// (2) Tasks shall run sequentially, e.g. posting task A then B implies
+// that A shall run before B.
+// NOTE: we do not make any assumptions about what thread tasks shall run on.
+class TaskRunner {
+ public:
+ using Task = std::function<void()>;
+
+ virtual ~TaskRunner() = default;
+
+ // Takes a Task that should be run at the first convenient time.
+ virtual void PostTask(Task task) = 0;
+
+ // Takes a Task that should be run no sooner than "delay" time from now. Note
+ // that we do not guarantee it will run precisely "delay" later, merely that
+ // it will run no sooner than "delay" time from now.
+ virtual void PostTaskWithDelay(Task task, Clock::duration delay) = 0;
+};
+} // namespace platform
+} // namespace openscreen
+
+#endif // API_PUBLIC_TASK_RUNNER_H_
diff --git a/api/public/task_runner_factory.h b/api/public/task_runner_factory.h
new file mode 100644
index 0000000..ac15c46
--- /dev/null
+++ b/api/public/task_runner_factory.h
@@ -0,0 +1,28 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_PUBLIC_TASK_RUNNER_FACTORY_H_
+#define API_PUBLIC_TASK_RUNNER_FACTORY_H_
+
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
+#include <thread> // NOLINT
+#include <utility>
+
+#include "api/public/task_runner.h"
+
+namespace openscreen {
+namespace platform {
+
+class TaskRunnerFactory {
+ public:
+ // Creates a instantiated TaskRunner
+ static std::unique_ptr<TaskRunner> Create();
+};
+} // namespace platform
+} // namespace openscreen
+
+#endif // API_PUBLIC_TASK_RUNNER_FACTORY_H_