TaskRunnerImpl stops by posting a "quit task" to itself.

This changes the behavior of RequestStopSoon(): Before, it was
performing a synchronized immediate stop of the run loop. Now, it simply
posts a task that will cause the run loop to stop when it comes across a
"quit task" in the queue of "run-now" tasks.

Rationale: It greatly simplifies applications (e.g., demos) and the unit
tests if the task runner will guarantee that all previously-posted
(non-delayed) tasks have run before it stops. Specifically, tasks can be
posted to a task runner that executes on another thread, then the "quit
task," and then the application only needs to join on the thread to know
that everything that had to be run has completed.

This change also allowed for a ton of simplification in the unit tests:
For example, TaskRunnerImpl::RunUntilIdleForTesting() is no longer
needed, and was removed. The tests are now using only the API that is
available to regular, non-test code.

Change-Id: I6df1cc917cb5299526a2cdc32a2c48fe44fd0951
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1820181
Reviewed-by: Jordan Bayles <jophba@chromium.org>
Reviewed-by: Ryan Keane <rwkeane@google.com>
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Yuri Wiitala <miu@chromium.org>
diff --git a/platform/impl/task_runner.cc b/platform/impl/task_runner.cc
index 3e957a1..a03842e 100644
--- a/platform/impl/task_runner.cc
+++ b/platform/impl/task_runner.cc
@@ -48,51 +48,39 @@
 }
 
 void TaskRunnerImpl::RunUntilStopped() {
+  OSP_CHECK(!is_running_);
   task_runner_thread_id_ = std::this_thread::get_id();
-  const bool was_running = is_running_.exchange(true);
-  OSP_CHECK(!was_running);
+  is_running_ = true;
 
-  RunTasksUntilStopped();
+  // Main loop: Run until the |is_running_| flag is set back to false by the
+  // "quit task" posted by RequestStopSoon().
+  while (is_running_) {
+    ScheduleDelayedTasks();
+    RunTasksAfterWaiting();
+  }
+
+  // Flushing phase: Ensure all immediately-runnable tasks are run before
+  // returning. Since running some tasks might cause more immediately-runnable
+  // tasks to be posted, loop until there is no more work.
+  {
+    std::unique_lock<std::mutex> lock(task_mutex_);
+    while (!tasks_.empty()) {
+      OSP_DCHECK(running_tasks_.empty());
+      running_tasks_.swap(tasks_);
+      lock.unlock();
+      RunRunnableTasks();
+      lock.lock();
+    }
+  }
+
+  task_runner_thread_id_ = std::thread::id();
 }
 
 void TaskRunnerImpl::RequestStopSoon() {
-  const bool was_running = is_running_.exchange(false);
-
-  if (was_running) {
-    OSP_DVLOG << "Requesting stop...";
-    if (task_waiter_) {
-      task_waiter_->OnTaskPosted();
-    } else {
-      std::lock_guard<std::mutex> lock(task_mutex_);
-      run_loop_wakeup_.notify_one();
-    }
-  }
+  PostTask([this]() { is_running_ = false; });
 }
 
-void TaskRunnerImpl::RunUntilIdleForTesting() {
-  ScheduleDelayedTasks();
-  RunCurrentTasksForTesting();
-}
-
-void TaskRunnerImpl::RunCurrentTasksForTesting() {
-  {
-    // Unlike in the RunCurrentTasksBlocking method, here we just immediately
-    // take the lock and drain the tasks_ queue. This allows tests to avoid
-    // having to do any multithreading to interact with the queue.
-    std::unique_lock<std::mutex> lock(task_mutex_);
-    OSP_DCHECK(running_tasks_.empty());
-    running_tasks_.swap(tasks_);
-  }
-
-  for (TaskWithMetadata& task : running_tasks_) {
-    // Move the task to the stack so that its bound state is freed immediately
-    // after being run.
-    std::move(task)();
-  }
-  running_tasks_.clear();
-}
-
-void TaskRunnerImpl::RunCurrentTasksBlocking() {
+void TaskRunnerImpl::RunTasksAfterWaiting() {
   {
     // Wait for the lock. If there are no current tasks, we will wait until
     // a delayed task is ready or a task gets added to the queue.
@@ -105,6 +93,10 @@
     running_tasks_.swap(tasks_);
   }
 
+  RunRunnableTasks();
+}
+
+void TaskRunnerImpl::RunRunnableTasks() {
   OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
   for (TaskWithMetadata& running_task : running_tasks_) {
     // Move the task to the stack so that its bound state is freed immediately
@@ -115,14 +107,6 @@
   running_tasks_.clear();
 }
 
-void TaskRunnerImpl::RunTasksUntilStopped() {
-  while (is_running_) {
-    ScheduleDelayedTasks();
-    RunCurrentTasksBlocking();
-  }
-  task_runner_thread_id_ = std::thread::id();
-}
-
 void TaskRunnerImpl::ScheduleDelayedTasks() {
   std::lock_guard<std::mutex> lock(task_mutex_);
 
@@ -149,14 +133,6 @@
 }
 
 std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
-  // These checks are redundant, as they are a subset of predicates in the
-  // below wait predicate. However, this is more readable and a slight
-  // optimization, as we don't need to take a lock if we aren't running.
-  if (!is_running_) {
-    OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
-    return {};
-  }
-
   std::unique_lock<std::mutex> lock(task_mutex_);
   if (!tasks_.empty()) {
     OSP_DVLOG << "TaskRunnerImpl lock acquired.";
@@ -179,19 +155,16 @@
     } while (!ShouldWakeUpRunLoop());
   } else {
     // Pass a wait predicate to avoid lost or spurious wakeups.
+    const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
     if (!delayed_tasks_.empty()) {
       // We don't have any work to do currently, but have some in the
       // pipe.
-      const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
       OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
       run_loop_wakeup_.wait_for(lock,
                                 delayed_tasks_.begin()->first - now_function_(),
                                 wait_predicate);
     } else {
       // We don't have any work queued.
-      const auto wait_predicate = [this] {
-        return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
-      };
       OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
       run_loop_wakeup_.wait(lock, wait_predicate);
     }
diff --git a/platform/impl/task_runner.h b/platform/impl/task_runner.h
index 569b747..d450745 100644
--- a/platform/impl/task_runner.h
+++ b/platform/impl/task_runner.h
@@ -5,7 +5,6 @@
 #ifndef PLATFORM_IMPL_TASK_RUNNER_H_
 #define PLATFORM_IMPL_TASK_RUNNER_H_
 
-#include <atomic>
 #include <condition_variable>  // NOLINT
 #include <map>
 #include <memory>
@@ -59,22 +58,19 @@
   void PostPackagedTaskWithDelay(Task task, Clock::duration delay) final;
   bool IsRunningOnTaskRunner() final;
 
-  // Tasks will only be executed if RunUntilStopped has been called, and
-  // RequestStopSoon has not. Important note: TaskRunner does NOT do any
-  // threading, so calling "RunUntilStopped()" will block whatever thread you
-  // are calling it on.
+  // Blocks the current thread, executing tasks from the queue with the desired
+  // timing; and does not return until some time after RequestStopSoon() is
+  // called.
   void RunUntilStopped();
 
-  // Thread-safe method for requesting the TaskRunner to stop running. This sets
-  // a flag that will get checked in the run loop, typically after completing
-  // the current task.
+  // Thread-safe method for requesting the TaskRunner to stop running after all
+  // non-delayed tasks in the queue have run. This behavior allows final
+  // clean-up tasks to be executed before the TaskRunner stops.
+  //
+  // If any non-delayed tasks post additional non-delayed tasks, those will be
+  // run as well before returning.
   void RequestStopSoon();
 
-  // Execute all tasks immediately, useful for testing only. Note: this method
-  // will schedule any delayed tasks that are ready to run, but does not block
-  // waiting for delayed tasks to become eligible.
-  void RunUntilIdleForTesting();
-
  private:
 #ifndef TRACE_FORCE_DISABLE
   // Wrapper around a Task used to store the TraceId Metadata along with the
@@ -101,18 +97,12 @@
   using TaskWithMetadata = Task;
 #endif  // TRACE_FORCE_DISABLE
 
-  // Run all tasks already in the task queue. If the queue is empty, wait for
-  // either (1) a delayed task to become available, or (2) a task to be added
-  // to the queue.
-  void RunCurrentTasksBlocking();
+  // If necessary, wait for either (1) a delayed task to become available, or
+  // (2) a task to be added to the queue; and then run all runnable tasks.
+  void RunTasksAfterWaiting();
 
-  // Run tasks already in the queue, for testing. If the queue is empty, this
-  // method does not block but instead returns immediately.
-  void RunCurrentTasksForTesting();
-
-  // Loop method that runs tasks in the current thread, until the
-  // RequestStopSoon method is called.
-  void RunTasksUntilStopped();
+  // Helper that runs all tasks in |running_tasks_| and then clears it.
+  void RunRunnableTasks();
 
   // Look at all tasks in the delayed task queue, then schedule them if the
   // minimum delay time has elapsed.
@@ -129,8 +119,9 @@
 
   const platform::ClockNowFunctionPtr now_function_;
 
-  // Atomic so that we can perform atomic exchanges.
-  std::atomic_bool is_running_;
+  // Flag that indicates whether the task runner loop should continue. This is
+  // only meant to be read/written on the thread executing RunUntilStopped().
+  bool is_running_;
 
   // This mutex is used for |tasks_| and |delayed_tasks_|, and also for
   // notifying the run loop to wake up when it is waiting for a task to be added
diff --git a/platform/impl/task_runner_unittest.cc b/platform/impl/task_runner_unittest.cc
index b8d582c..34a33da 100644
--- a/platform/impl/task_runner_unittest.cc
+++ b/platform/impl/task_runner_unittest.cc
@@ -50,7 +50,7 @@
 
   void WakeUpAndStop() {
     OnTaskPosted();
-    task_runner_->PostTask([this]() { task_runner_->RequestStopSoon(); });
+    task_runner_->RequestStopSoon();
   }
 
   bool IsWaiting() const { return waiting_.load(); }
@@ -85,23 +85,16 @@
 
 }  // anonymous namespace
 
-TEST(TaskRunnerImplTest, TaskRunnerExecutesTask) {
+TEST(TaskRunnerImplTest, TaskRunnerExecutesTaskAndStops) {
   FakeClock fake_clock{platform::Clock::time_point(milliseconds(1337))};
-  auto runner = std::make_unique<TaskRunnerImpl>(&fake_clock.now);
-
-  std::thread t([&runner] { runner.get()->RunUntilStopped(); });
+  TaskRunnerImpl runner(&fake_clock.now);
 
   std::string ran_tasks = "";
-  const auto task = [&ran_tasks] { ran_tasks += "1"; };
-  EXPECT_EQ(ran_tasks, "");
+  runner.PostTask([&ran_tasks] { ran_tasks += "1"; });
+  runner.RequestStopSoon();
 
-  runner->PostTask(task);
-
-  WaitUntilCondition([&ran_tasks] { return ran_tasks == "1"; });
+  runner.RunUntilStopped();
   EXPECT_EQ(ran_tasks, "1");
-
-  runner.get()->RequestStopSoon();
-  t.join();
 }
 
 TEST(TaskRunnerImplTest, TaskRunnerRunsDelayedTasksInOrder) {
@@ -148,52 +141,43 @@
   runner.PostTask(task_three);
   runner.PostTask(task_four);
   runner.PostTask(task_five);
+  runner.RequestStopSoon();
   EXPECT_EQ(ran_tasks, "");
 
-  runner.RunUntilIdleForTesting();
+  runner.RunUntilStopped();
   EXPECT_EQ(ran_tasks, "12345");
 }
 
-TEST(TaskRunnerImplTest, TaskRunnerCanStopRunning) {
+TEST(TaskRunnerImplTest, RunsAllImmediateTasksBeforeStopping) {
   FakeClock fake_clock{platform::Clock::time_point(milliseconds(1337))};
   TaskRunnerImpl runner(&fake_clock.now);
 
-  std::string ran_tasks;
-  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
-  const auto task_two = [&ran_tasks] { ran_tasks += "2"; };
+  std::string result;
+  runner.PostTask([&] {
+    result += "Alice";
 
-  runner.PostTask(task_one);
-  EXPECT_EQ(ran_tasks, "");
+    // Post a task that runs just before the quit task.
+    runner.PostTask([&] {
+      result += " says goodbye";
 
-  std::thread start_thread([&runner] { runner.RunUntilStopped(); });
+      // These tasks will enter the queue after the quit task *and* after the
+      // main loop breaks. They will be executed by the flushing phase.
+      runner.PostTask([&] {
+        result += " and is not";
+        runner.PostTask([&] { result += " forgotten."; });
+      });
+    });
 
-  WaitUntilCondition([&ran_tasks] { return !ran_tasks.empty(); });
-  EXPECT_EQ(ran_tasks, "1");
+    // Post the quit task.
+    runner.RequestStopSoon();
+  });
 
-  // Since Stop is called first, and the single threaded task
-  // runner should honor the queue, we know the task runner is not running
-  // since task two doesn't get ran.
-  runner.RequestStopSoon();
-  runner.PostTask(task_two);
-  EXPECT_EQ(ran_tasks, "1");
-
-  start_thread.join();
-}
-
-TEST(TaskRunnerImplTest, StoppingDoesNotDeleteTasks) {
-  FakeClock fake_clock{platform::Clock::time_point(milliseconds(1337))};
-  TaskRunnerImpl runner(&fake_clock.now);
-
-  std::string ran_tasks;
-  const auto task_one = [&ran_tasks] { ran_tasks += "1"; };
-
-  runner.PostTask(task_one);
-  runner.RequestStopSoon();
-
-  EXPECT_EQ(ran_tasks, "");
-  runner.RunUntilIdleForTesting();
-
-  EXPECT_EQ(ran_tasks, "1");
+  EXPECT_EQ(result, "");
+  runner.RunUntilStopped();
+  // All posted tasks will execute because RequestStopSoon() guarantees all
+  // immediately-runnable tasks will run before exiting, even if new
+  // immediately-runnable tasks are posted in the meantime.
+  EXPECT_EQ(result, "Alice says goodbye and is not forgotten.");
 }
 
 TEST(TaskRunnerImplTest, TaskRunnerIsStableWithLotsOfTasks) {
@@ -210,7 +194,8 @@
     runner.PostTask(task);
   }
 
-  runner.RunUntilIdleForTesting();
+  runner.RequestStopSoon();
+  runner.RunUntilStopped();
   EXPECT_EQ(ran_tasks, expected_ran_tasks);
 }
 
@@ -224,7 +209,8 @@
   runner.PostTaskWithDelay(delayed_task, milliseconds(10000));
   runner.PostTask(task);
 
-  runner.RunUntilIdleForTesting();
+  runner.RequestStopSoon();
+  runner.RunUntilStopped();
   // The immediate task should have run, even though the delayed task
   // was added first.