Delete osp_base and move files to new homes
This patch is the second and major patch in the process of removing the
osp_base folder from Open Screen. Based on the design plan here:
https://docs.google.com/document/d/1LGV8tXdDeIH38MYlNF2XJNG49pec-64nWkS0jjnJNk4/edit#heading=h.ny8tc2v4ek9m
This patch moves most of the files in osp_base to new homes in platform,
excepting files that have been moved to the new util/ folder.
Change-Id: I6e5f1d13cf20806bcc41185a842eb0b293606306
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1695736
Reviewed-by: Jordan Bayles <jophba@chromium.org>
Reviewed-by: mark a. foltz <mfoltz@chromium.org>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/platform/impl/task_runner.cc b/platform/impl/task_runner.cc
new file mode 100644
index 0000000..89886dc
--- /dev/null
+++ b/platform/impl/task_runner.cc
@@ -0,0 +1,195 @@
+// Copyright 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file
+
+#include "platform/impl/task_runner.h"
+
+#include "platform/api/logging.h"
+
+namespace openscreen {
+namespace platform {
+
+TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function,
+ TaskWaiter* event_waiter,
+ Clock::duration waiter_timeout)
+ : now_function_(now_function),
+ is_running_(false),
+ task_waiter_(event_waiter),
+ waiter_timeout_(waiter_timeout) {}
+
+TaskRunnerImpl::~TaskRunnerImpl() = default;
+
+void TaskRunnerImpl::PostPackagedTask(Task task) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ tasks_.push_back(std::move(task));
+ if (task_waiter_) {
+ task_waiter_->OnTaskPosted();
+ } else {
+ run_loop_wakeup_.notify_one();
+ }
+}
+
+void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
+ Clock::duration delay) {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ delayed_tasks_.emplace(
+ std::make_pair(now_function_() + delay, std::move(task)));
+ if (task_waiter_) {
+ task_waiter_->OnTaskPosted();
+ } else {
+ run_loop_wakeup_.notify_one();
+ }
+}
+
+void TaskRunnerImpl::RunUntilStopped() {
+ const bool was_running = is_running_.exchange(true);
+ OSP_CHECK(!was_running);
+
+ RunTasksUntilStopped();
+}
+
+void TaskRunnerImpl::RequestStopSoon() {
+ const bool was_running = is_running_.exchange(false);
+
+ if (was_running) {
+ OSP_DVLOG << "Requesting stop...";
+ if (task_waiter_) {
+ task_waiter_->OnTaskPosted();
+ } else {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+ run_loop_wakeup_.notify_one();
+ }
+ }
+}
+
+void TaskRunnerImpl::RunUntilIdleForTesting() {
+ ScheduleDelayedTasks();
+ RunCurrentTasksForTesting();
+}
+
+void TaskRunnerImpl::RunCurrentTasksForTesting() {
+ {
+ // Unlike in the RunCurrentTasksBlocking method, here we just immediately
+ // take the lock and drain the tasks_ queue. This allows tests to avoid
+ // having to do any multithreading to interact with the queue.
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ OSP_DCHECK(running_tasks_.empty());
+ running_tasks_.swap(tasks_);
+ }
+
+ for (Task& task : running_tasks_) {
+ // Move the task to the stack so that its bound state is freed immediately
+ // after being run.
+ std::move(task)();
+ }
+ running_tasks_.clear();
+}
+
+void TaskRunnerImpl::RunCurrentTasksBlocking() {
+ {
+ // Wait for the lock. If there are no current tasks, we will wait until
+ // a delayed task is ready or a task gets added to the queue.
+ auto lock = WaitForWorkAndAcquireLock();
+ if (!lock) {
+ return;
+ }
+
+ OSP_DCHECK(running_tasks_.empty());
+ running_tasks_.swap(tasks_);
+ }
+
+ OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
+ for (Task& task : running_tasks_) {
+ // Move the task to the stack so that its bound state is freed immediately
+ // after being run.
+ std::move(task)();
+ }
+ running_tasks_.clear();
+}
+
+void TaskRunnerImpl::RunTasksUntilStopped() {
+ while (is_running_) {
+ ScheduleDelayedTasks();
+ RunCurrentTasksBlocking();
+ }
+}
+
+void TaskRunnerImpl::ScheduleDelayedTasks() {
+ std::lock_guard<std::mutex> lock(task_mutex_);
+
+ // Getting the time can be expensive on some platforms, so only get it once.
+ const auto current_time = now_function_();
+ const auto end_of_range = delayed_tasks_.upper_bound(current_time);
+ for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
+ tasks_.push_back(std::move(it->second));
+ }
+ delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
+}
+
+bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
+ if (!is_running_) {
+ return true;
+ }
+
+ if (!tasks_.empty()) {
+ return true;
+ }
+
+ return !delayed_tasks_.empty() &&
+ (delayed_tasks_.begin()->first <= now_function_());
+}
+
+std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+ // These checks are redundant, as they are a subset of predicates in the
+ // below wait predicate. However, this is more readable and a slight
+ // optimization, as we don't need to take a lock if we aren't running.
+ if (!is_running_) {
+ OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
+ return {};
+ }
+
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ if (!tasks_.empty()) {
+ OSP_DVLOG << "TaskRunnerImpl lock acquired.";
+ return lock;
+ }
+
+ if (task_waiter_) {
+ do {
+ Clock::duration timeout = waiter_timeout_;
+ if (!delayed_tasks_.empty()) {
+ Clock::duration next_task_delta =
+ delayed_tasks_.begin()->first - now_function_();
+ if (next_task_delta < timeout) {
+ timeout = next_task_delta;
+ }
+ }
+ lock.unlock();
+ task_waiter_->WaitForTaskToBePosted(timeout);
+ lock.lock();
+ } while (!ShouldWakeUpRunLoop());
+ } else {
+ // Pass a wait predicate to avoid lost or spurious wakeups.
+ if (!delayed_tasks_.empty()) {
+ // We don't have any work to do currently, but have some in the
+ // pipe.
+ const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
+ OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
+ run_loop_wakeup_.wait_for(lock,
+ delayed_tasks_.begin()->first - now_function_(),
+ wait_predicate);
+ } else {
+ // We don't have any work queued.
+ const auto wait_predicate = [this] {
+ return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
+ };
+ OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
+ run_loop_wakeup_.wait(lock, wait_predicate);
+ }
+ }
+
+ OSP_DVLOG << "TaskRunnerImpl lock acquired.";
+ return lock;
+}
+} // namespace platform
+} // namespace openscreen