Add TaskRunner implementation to Open Screen

This patch contains an initial TaskRunner for use with Open Screen.
This initial version does not contain a Chromium wrapper, nor does
it permit cancelling. A unit test suite is included for this new
class.

Change-Id: I2404980acf3b40b78d9772925ebe11ed460cc0b6
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1544773
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/api/BUILD.gn b/api/BUILD.gn
index d9dbf0b..cbe9f08 100644
--- a/api/BUILD.gn
+++ b/api/BUILD.gn
@@ -37,6 +37,7 @@
     "impl/receiver_list_unittest.cc",
     "impl/service_listener_impl_unittest.cc",
     "impl/service_publisher_impl_unittest.cc",
+    "impl/task_runner_unittest.cc",
     "public/endpoint_request_ids_unittest.cc",
     "public/message_demuxer_unittest.cc",
     "public/service_info_unittest.cc",
diff --git a/api/impl/BUILD.gn b/api/impl/BUILD.gn
index 4ba40a5..910932c 100644
--- a/api/impl/BUILD.gn
+++ b/api/impl/BUILD.gn
@@ -22,6 +22,9 @@
     "service_listener_impl.h",
     "service_publisher_impl.cc",
     "service_publisher_impl.h",
+    "task_runner_factory.cc",
+    "task_runner_impl.cc",
+    "task_runner_impl.h",
   ]
 
   if (use_mdns_responder) {
diff --git a/api/impl/task_runner_factory.cc b/api/impl/task_runner_factory.cc
new file mode 100644
index 0000000..fa724d0
--- /dev/null
+++ b/api/impl/task_runner_factory.cc
@@ -0,0 +1,19 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/public/task_runner_factory.h"
+
+#include "api/impl/task_runner_impl.h"
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+
+// static
+std::unique_ptr<TaskRunner> TaskRunnerFactory::Create() {
+  return std::unique_ptr<TaskRunner>(new TaskRunnerImpl(platform::Clock::now));
+}
+
+}  // namespace platform
+}  // namespace openscreen
diff --git a/api/impl/task_runner_impl.cc b/api/impl/task_runner_impl.cc
new file mode 100644
index 0000000..1aa5b33
--- /dev/null
+++ b/api/impl/task_runner_impl.cc
@@ -0,0 +1,119 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/impl/task_runner_impl.h"
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
+    : now_function_(now_function), is_running_(false) {}
+
+TaskRunnerImpl::~TaskRunnerImpl() {
+  RequestStopSoon();
+}
+
+void TaskRunnerImpl::PostTask(Task task) {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+  tasks_.push_back(std::move(task));
+  run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+  delayed_tasks_.emplace(std::move(task), now_function_() + delay);
+  run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::RunUntilStopped() {
+  const bool was_running = is_running_.exchange(true);
+  OSP_CHECK(!was_running);
+
+  RunTasksUntilStopped();
+}
+
+void TaskRunnerImpl::RequestStopSoon() {
+  const bool was_running = is_running_.exchange(false);
+
+  if (was_running) {
+    std::lock_guard<std::mutex> lock(task_mutex_);
+    run_loop_wakeup_.notify_one();
+  }
+}
+
+void TaskRunnerImpl::RunUntilIdleForTesting() {
+  ScheduleDelayedTasks();
+  RunCurrentTasksForTesting();
+}
+
+void TaskRunnerImpl::RunCurrentTasksForTesting() {
+  std::deque<Task> current_tasks;
+
+  std::unique_lock<std::mutex> lock(task_mutex_);
+  tasks_.swap(current_tasks);
+  lock.unlock();
+
+  for (Task& task : current_tasks) {
+    task();
+  }
+}
+
+void TaskRunnerImpl::RunCurrentTasksBlocking() {
+  std::deque<Task> current_tasks;
+
+  auto lock = WaitForWorkAndAcquireLock();
+  tasks_.swap(current_tasks);
+  lock.unlock();
+
+  for (Task& task : current_tasks) {
+    task();
+  }
+}
+
+void TaskRunnerImpl::RunTasksUntilStopped() {
+  while (is_running_) {
+    ScheduleDelayedTasks();
+    RunCurrentTasksBlocking();
+  }
+}
+
+void TaskRunnerImpl::ScheduleDelayedTasks() {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+
+  // Getting the time can be expensive on some platforms, so only get it once.
+  const auto current_time = now_function_();
+  while (!delayed_tasks_.empty() &&
+         (delayed_tasks_.top().time_runnable_after < current_time)) {
+    tasks_.push_back(std::move(delayed_tasks_.top().task));
+    delayed_tasks_.pop();
+  }
+}
+
+std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+  std::unique_lock<std::mutex> lock(task_mutex_);
+
+  // Pass a wait predicate to avoid lost or spurious wakeups.
+  const auto wait_predicate = [this] {
+    // Either we got woken up because we aren't running
+    // (probably just to end the thread), or we are running and have tasks to
+    // do.
+    return !this->is_running_ || !this->tasks_.empty() ||
+           !this->delayed_tasks_.empty();
+  };
+
+  // We don't have any work to do currently, but know we have some in the pipe.
+  if (!delayed_tasks_.empty()) {
+    run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
+                                wait_predicate);
+
+    // We don't have any work queued.
+  } else if (tasks_.empty()) {
+    run_loop_wakeup_.wait(lock, wait_predicate);
+  }
+
+  return lock;
+}
+}  // namespace platform
+}  // namespace openscreen
diff --git a/api/impl/task_runner_impl.h b/api/impl/task_runner_impl.h
new file mode 100644
index 0000000..ab86575
--- /dev/null
+++ b/api/impl/task_runner_impl.h
@@ -0,0 +1,97 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_IMPL_TASK_RUNNER_IMPL_H_
+#define API_IMPL_TASK_RUNNER_IMPL_H_
+
+#include <atomic>
+#include <condition_variable>  // NOLINT
+#include <deque>
+#include <functional>
+#include <memory>
+#include <mutex>  // NOLINT
+#include <queue>
+#include <thread>  // NOLINT
+#include <utility>
+#include <vector>
+
+#include "absl/base/thread_annotations.h"
+#include "absl/types/optional.h"
+#include "api/public/task_runner.h"
+#include "platform/api/time.h"
+
+namespace openscreen {
+namespace platform {
+
+class TaskRunnerImpl : public TaskRunner {
+ public:
+  explicit TaskRunnerImpl(platform::ClockNowFunctionPtr now_function);
+
+  // TaskRunner overrides
+  ~TaskRunnerImpl() override;
+  void PostTask(Task task) override;
+  void PostTaskWithDelay(Task task, Clock::duration delay) override;
+
+  // Tasks will only be executed if RunUntilStopped has been called, and Stop
+  // has not. Important note: TaskRunnerImpl does NOT do any threading, so
+  // calling "RunUntilStopped()" will block whatever thread you are calling it
+  // on.
+  void RunUntilStopped();
+  void RequestStopSoon();
+
+  // Execute all tasks immediately, useful for testing only. Note: this method
+  // will schedule any delayed tasks that are ready to run, but does not block
+  // waiting for delayed tasks to become eligible.
+  void RunUntilIdleForTesting();
+
+ private:
+  struct DelayedTask {
+    DelayedTask(Task task_, Clock::time_point time_runnable_after_)
+        : task(std::move(task_)), time_runnable_after(time_runnable_after_) {}
+
+    Task task;
+    Clock::time_point time_runnable_after;
+
+    bool operator>(const DelayedTask& other) const {
+      return this->time_runnable_after > other.time_runnable_after;
+    }
+  };
+
+  // Run all tasks already in the task queue.
+  void RunCurrentTasksBlocking();
+  void RunCurrentTasksForTesting();
+
+  // Loop method that runs tasks in the current thread, until the Stop
+  // method is called.
+  void RunTasksUntilStopped();
+
+  // Look at all tasks in the delayed task queue, then schedule them if the
+  // minimum delay time has elapsed.
+  void ScheduleDelayedTasks();
+
+  // Puts the task running thread into a waiting state for a notify on the
+  // run loop wakeup condition variable, and returns the acquired lock.
+  std::unique_lock<std::mutex> WaitForWorkAndAcquireLock();
+
+  const platform::ClockNowFunctionPtr now_function_;
+
+  // Atomic so that we can perform atomic exchanges.
+  std::atomic_bool is_running_;
+
+  // This mutex is used for both tasks_ and notifying the run loop to wake
+  // up when it is waiting for a task to be added to the queue in
+  // run_loop_wakeup_.
+  std::mutex task_mutex_;
+  std::priority_queue<DelayedTask,
+                      std::vector<DelayedTask>,
+                      std::greater<DelayedTask>>
+      delayed_tasks_ GUARDED_BY(task_mutex_);
+
+  std::condition_variable run_loop_wakeup_;
+  std::deque<Task> tasks_ GUARDED_BY(task_mutex_);
+};
+}  // namespace platform
+}  // namespace openscreen
+
+#endif  // API_IMPL_TASK_RUNNER_IMPL_H_
diff --git a/api/impl/task_runner_unittest.cc b/api/impl/task_runner_unittest.cc
new file mode 100644
index 0000000..ab6d46e
--- /dev/null
+++ b/api/impl/task_runner_unittest.cc
@@ -0,0 +1,166 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <thread>  // NOLINT
+
+#include "api/impl/task_runner_impl.h"
+#include "api/public/task_runner_factory.h"
+#include "platform/api/time.h"
+#include "third_party/googletest/src/googletest/include/gtest/gtest.h"
+
+namespace {
+using openscreen::platform::Clock;
+using std::chrono::milliseconds;
+
+const auto kTaskRunnerSleepTime = milliseconds(1);
+
+void WaitUntilCondition(std::function<bool()> predicate) {
+  while (!predicate()) {
+    std::this_thread::sleep_for(kTaskRunnerSleepTime);
+  }
+}
+}  // anonymous namespace
+
+namespace openscreen {
+namespace platform {
+
+TEST(TaskRunnerTest, TaskRunnerFromFactoryExecutesTask) {
+  auto runner = TaskRunnerFactory::Create();
+
+  std::thread t([&runner] {
+    static_cast<TaskRunnerImpl*>(runner.get())->RunUntilStopped();
+  });
+
+  std::string ran_tasks = "";
+  const auto task = [&ran_tasks] { ran_tasks += "1"; };
+  EXPECT_EQ(ran_tasks, "");
+
+  runner->PostTask(task);
+
+  WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
+  EXPECT_EQ(ran_tasks, "1");
+
+  static_cast<TaskRunnerImpl*>(runner.get())->RequestStopSoon();
+  t.join();
+}
+
+TEST(TaskRunnerTest, TaskRunnerRunsDelayedTasksInOrder) {
+  auto runner = TaskRunnerFactory::Create();
+
+  std::thread t([&runner] {
+    static_cast<TaskRunnerImpl*>(runner.get())->RunUntilStopped();
+  });
+
+  std::string ran_tasks = "";
+
+  const auto kDelayTimeTaskOne = milliseconds(5);
+  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+  runner->PostTaskWithDelay(task_one, kDelayTimeTaskOne);
+
+  const auto kDelayTimeTaskTwo = milliseconds(10);
+  const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+  runner->PostTaskWithDelay(task_two, kDelayTimeTaskTwo);
+
+  const auto now = platform::Clock::now();
+  const auto kMinimumClockFirstTaskShouldRunAt = now + kDelayTimeTaskOne;
+  const auto kMinimumClockSecondTaskShouldRunAt = now + kDelayTimeTaskTwo;
+  while (platform::Clock::now() < kMinimumClockFirstTaskShouldRunAt) {
+    EXPECT_EQ(ran_tasks, "");
+    std::this_thread::sleep_for(kTaskRunnerSleepTime);
+  }
+
+  WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
+  EXPECT_EQ(ran_tasks, "1");
+
+  while (platform::Clock::now() < kMinimumClockSecondTaskShouldRunAt) {
+    EXPECT_EQ(ran_tasks, "1");
+    std::this_thread::sleep_for(kTaskRunnerSleepTime);
+  }
+
+  WaitUntilCondition([&ran_tasks] { return ran_tasks == "12"; });
+  EXPECT_EQ(ran_tasks, "12");
+
+  static_cast<TaskRunnerImpl*>(runner.get())->RequestStopSoon();
+  t.join();
+}
+
+TEST(TaskRunnerTest, SingleThreadedTaskRunnerRunsSequentially) {
+  TaskRunnerImpl runner(platform::Clock::now);
+
+  std::string ran_tasks;
+  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+  const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+  const auto task_three = [&ran_tasks] { ran_tasks += "3"; };
+  const auto task_four = [&ran_tasks] { ran_tasks += "4"; };
+  const auto task_five = [&ran_tasks] { ran_tasks += "5"; };
+
+  runner.PostTask(task_one);
+  runner.PostTask(task_two);
+  runner.PostTask(task_three);
+  runner.PostTask(task_four);
+  runner.PostTask(task_five);
+  EXPECT_EQ(ran_tasks, "");
+
+  runner.RunUntilIdleForTesting();
+  EXPECT_EQ(ran_tasks, "12345");
+}
+
+TEST(TaskRunnerTest, TaskRunnerCanStopRunning) {
+  TaskRunnerImpl runner(platform::Clock::now);
+
+  std::string ran_tasks;
+  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+  const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+
+  runner.PostTask(task_one);
+  EXPECT_EQ(ran_tasks, "");
+
+  std::thread start_thread([&runner] { runner.RunUntilStopped(); });
+
+  WaitUntilCondition([&ran_tasks] { return !ran_tasks.empty(); });
+  EXPECT_EQ(ran_tasks, "1");
+
+  // Since Stop is called first, and the single threaded task
+  // runner should honor the queue, we know the task runner is not running
+  // since task two doesn't get ran.
+  runner.RequestStopSoon();
+  runner.PostTask(task_two);
+  EXPECT_EQ(ran_tasks, "1");
+
+  start_thread.join();
+}
+
+TEST(TaskRunnerTest, StoppingDoesNotDeleteTasks) {
+  TaskRunnerImpl runner(platform::Clock::now);
+
+  std::string ran_tasks;
+  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
+
+  runner.PostTask(task_one);
+  runner.RequestStopSoon();
+
+  EXPECT_EQ(ran_tasks, "");
+  runner.RunUntilIdleForTesting();
+
+  EXPECT_EQ(ran_tasks, "1");
+}
+
+TEST(TaskRunnerTest, TaskRunnerIsStableWithLotsOfTasks) {
+  TaskRunnerImpl runner(platform::Clock::now);
+
+  const int kNumberOfTasks = 500;
+  std::string expected_ran_tasks;
+  expected_ran_tasks.append(kNumberOfTasks, '1');
+
+  std::string ran_tasks;
+  for (int i = 0; i < kNumberOfTasks; ++i) {
+    const auto task = [&ran_tasks] { ran_tasks += "1"; };
+    runner.PostTask(task);
+  }
+
+  runner.RunUntilIdleForTesting();
+  EXPECT_EQ(ran_tasks, expected_ran_tasks);
+}
+}  // namespace platform
+}  // namespace openscreen
diff --git a/api/public/BUILD.gn b/api/public/BUILD.gn
index a6894a4..28fec9f 100644
--- a/api/public/BUILD.gn
+++ b/api/public/BUILD.gn
@@ -33,6 +33,8 @@
     "service_listener.h",
     "service_publisher.cc",
     "service_publisher.h",
+    "task_runner.h",
+    "task_runner_factory.h",
   ]
 
   public_deps = [
diff --git a/api/public/task_runner.h b/api/public/task_runner.h
new file mode 100644
index 0000000..2921032
--- /dev/null
+++ b/api/public/task_runner.h
@@ -0,0 +1,45 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_PUBLIC_TASK_RUNNER_H_
+#define API_PUBLIC_TASK_RUNNER_H_
+
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
+#include <utility>
+
+#include "platform/api/time.h"
+
+namespace openscreen {
+namespace platform {
+
+// A thread-safe API surface that allows for posting tasks. The underlying
+// implementation may be single or multi-threaded, and all complication should
+// be handled by either the implementation class or the TaskRunnerFactory
+// method. It is the expectation of this API that the underlying impl gives
+// the following guarantees:
+// (1) Tasks shall not overlap in time/CPU.
+// (2) Tasks shall run sequentially, e.g. posting task A then B implies
+//     that A shall run before B.
+// NOTE: we do not make any assumptions about what thread tasks shall run on.
+class TaskRunner {
+ public:
+  using Task = std::function<void()>;
+
+  virtual ~TaskRunner() = default;
+
+  // Takes a Task that should be run at the first convenient time.
+  virtual void PostTask(Task task) = 0;
+
+  // Takes a Task that should be run no sooner than "delay" time from now. Note
+  // that we do not guarantee it will run precisely "delay" later, merely that
+  // it will run no sooner than "delay" time from now.
+  virtual void PostTaskWithDelay(Task task, Clock::duration delay) = 0;
+};
+}  // namespace platform
+}  // namespace openscreen
+
+#endif  // API_PUBLIC_TASK_RUNNER_H_
diff --git a/api/public/task_runner_factory.h b/api/public/task_runner_factory.h
new file mode 100644
index 0000000..ac15c46
--- /dev/null
+++ b/api/public/task_runner_factory.h
@@ -0,0 +1,28 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef API_PUBLIC_TASK_RUNNER_FACTORY_H_
+#define API_PUBLIC_TASK_RUNNER_FACTORY_H_
+
+#include <deque>
+#include <functional>
+#include <list>
+#include <memory>
+#include <thread>  // NOLINT
+#include <utility>
+
+#include "api/public/task_runner.h"
+
+namespace openscreen {
+namespace platform {
+
+class TaskRunnerFactory {
+ public:
+  // Creates a instantiated TaskRunner
+  static std::unique_ptr<TaskRunner> Create();
+};
+}  // namespace platform
+}  // namespace openscreen
+
+#endif  // API_PUBLIC_TASK_RUNNER_FACTORY_H_