Add TaskRunner implementation to Open Screen

This patch contains an initial TaskRunner for use with Open Screen.
This initial version does not contain a Chromium wrapper, nor does
it permit cancelling. A unit test suite is included for this new
class.

Change-Id: I2404980acf3b40b78d9772925ebe11ed460cc0b6
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1544773
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/api/impl/task_runner_impl.cc b/api/impl/task_runner_impl.cc
new file mode 100644
index 0000000..1aa5b33
--- /dev/null
+++ b/api/impl/task_runner_impl.cc
@@ -0,0 +1,119 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "api/impl/task_runner_impl.h"
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
+    : now_function_(now_function), is_running_(false) {}
+
+TaskRunnerImpl::~TaskRunnerImpl() {
+  RequestStopSoon();
+}
+
+void TaskRunnerImpl::PostTask(Task task) {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+  tasks_.push_back(std::move(task));
+  run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+  delayed_tasks_.emplace(std::move(task), now_function_() + delay);
+  run_loop_wakeup_.notify_one();
+}
+
+void TaskRunnerImpl::RunUntilStopped() {
+  const bool was_running = is_running_.exchange(true);
+  OSP_CHECK(!was_running);
+
+  RunTasksUntilStopped();
+}
+
+void TaskRunnerImpl::RequestStopSoon() {
+  const bool was_running = is_running_.exchange(false);
+
+  if (was_running) {
+    std::lock_guard<std::mutex> lock(task_mutex_);
+    run_loop_wakeup_.notify_one();
+  }
+}
+
+void TaskRunnerImpl::RunUntilIdleForTesting() {
+  ScheduleDelayedTasks();
+  RunCurrentTasksForTesting();
+}
+
+void TaskRunnerImpl::RunCurrentTasksForTesting() {
+  std::deque<Task> current_tasks;
+
+  std::unique_lock<std::mutex> lock(task_mutex_);
+  tasks_.swap(current_tasks);
+  lock.unlock();
+
+  for (Task& task : current_tasks) {
+    task();
+  }
+}
+
+void TaskRunnerImpl::RunCurrentTasksBlocking() {
+  std::deque<Task> current_tasks;
+
+  auto lock = WaitForWorkAndAcquireLock();
+  tasks_.swap(current_tasks);
+  lock.unlock();
+
+  for (Task& task : current_tasks) {
+    task();
+  }
+}
+
+void TaskRunnerImpl::RunTasksUntilStopped() {
+  while (is_running_) {
+    ScheduleDelayedTasks();
+    RunCurrentTasksBlocking();
+  }
+}
+
+void TaskRunnerImpl::ScheduleDelayedTasks() {
+  std::lock_guard<std::mutex> lock(task_mutex_);
+
+  // Getting the time can be expensive on some platforms, so only get it once.
+  const auto current_time = now_function_();
+  while (!delayed_tasks_.empty() &&
+         (delayed_tasks_.top().time_runnable_after < current_time)) {
+    tasks_.push_back(std::move(delayed_tasks_.top().task));
+    delayed_tasks_.pop();
+  }
+}
+
+std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+  std::unique_lock<std::mutex> lock(task_mutex_);
+
+  // Pass a wait predicate to avoid lost or spurious wakeups.
+  const auto wait_predicate = [this] {
+    // Either we got woken up because we aren't running
+    // (probably just to end the thread), or we are running and have tasks to
+    // do.
+    return !this->is_running_ || !this->tasks_.empty() ||
+           !this->delayed_tasks_.empty();
+  };
+
+  // We don't have any work to do currently, but know we have some in the pipe.
+  if (!delayed_tasks_.empty()) {
+    run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
+                                wait_predicate);
+
+    // We don't have any work queued.
+  } else if (tasks_.empty()) {
+    run_loop_wakeup_.wait(lock, wait_predicate);
+  }
+
+  return lock;
+}
+}  // namespace platform
+}  // namespace openscreen