TaskRunnerImpl stops by posting a "quit task" to itself.
This changes the behavior of RequestStopSoon(): Before, it was
performing a synchronized immediate stop of the run loop. Now, it simply
posts a task that will cause the run loop to stop when it comes across a
"quit task" in the queue of "run-now" tasks.
Rationale: It greatly simplifies applications (e.g., demos) and the unit
tests if the task runner will guarantee that all previously-posted
(non-delayed) tasks have run before it stops. Specifically, tasks can be
posted to a task runner that executes on another thread, then the "quit
task," and then the application only needs to join on the thread to know
that everything that had to be run has completed.
This change also allowed for a ton of simplification in the unit tests:
For example, TaskRunnerImpl::RunUntilIdleForTesting() is no longer
needed, and was removed. The tests are now using only the API that is
available to regular, non-test code.
Change-Id: I6df1cc917cb5299526a2cdc32a2c48fe44fd0951
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1820181
Reviewed-by: Jordan Bayles <jophba@chromium.org>
Reviewed-by: Ryan Keane <rwkeane@google.com>
Reviewed-by: Yuri Wiitala <miu@chromium.org>
Commit-Queue: Yuri Wiitala <miu@chromium.org>
diff --git a/platform/impl/task_runner.cc b/platform/impl/task_runner.cc
index 3e957a1..a03842e 100644
--- a/platform/impl/task_runner.cc
+++ b/platform/impl/task_runner.cc
@@ -48,51 +48,39 @@
}
void TaskRunnerImpl::RunUntilStopped() {
+ OSP_CHECK(!is_running_);
task_runner_thread_id_ = std::this_thread::get_id();
- const bool was_running = is_running_.exchange(true);
- OSP_CHECK(!was_running);
+ is_running_ = true;
- RunTasksUntilStopped();
+ // Main loop: Run until the |is_running_| flag is set back to false by the
+ // "quit task" posted by RequestStopSoon().
+ while (is_running_) {
+ ScheduleDelayedTasks();
+ RunTasksAfterWaiting();
+ }
+
+ // Flushing phase: Ensure all immediately-runnable tasks are run before
+ // returning. Since running some tasks might cause more immediately-runnable
+ // tasks to be posted, loop until there is no more work.
+ {
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ while (!tasks_.empty()) {
+ OSP_DCHECK(running_tasks_.empty());
+ running_tasks_.swap(tasks_);
+ lock.unlock();
+ RunRunnableTasks();
+ lock.lock();
+ }
+ }
+
+ task_runner_thread_id_ = std::thread::id();
}
void TaskRunnerImpl::RequestStopSoon() {
- const bool was_running = is_running_.exchange(false);
-
- if (was_running) {
- OSP_DVLOG << "Requesting stop...";
- if (task_waiter_) {
- task_waiter_->OnTaskPosted();
- } else {
- std::lock_guard<std::mutex> lock(task_mutex_);
- run_loop_wakeup_.notify_one();
- }
- }
+ PostTask([this]() { is_running_ = false; });
}
-void TaskRunnerImpl::RunUntilIdleForTesting() {
- ScheduleDelayedTasks();
- RunCurrentTasksForTesting();
-}
-
-void TaskRunnerImpl::RunCurrentTasksForTesting() {
- {
- // Unlike in the RunCurrentTasksBlocking method, here we just immediately
- // take the lock and drain the tasks_ queue. This allows tests to avoid
- // having to do any multithreading to interact with the queue.
- std::unique_lock<std::mutex> lock(task_mutex_);
- OSP_DCHECK(running_tasks_.empty());
- running_tasks_.swap(tasks_);
- }
-
- for (TaskWithMetadata& task : running_tasks_) {
- // Move the task to the stack so that its bound state is freed immediately
- // after being run.
- std::move(task)();
- }
- running_tasks_.clear();
-}
-
-void TaskRunnerImpl::RunCurrentTasksBlocking() {
+void TaskRunnerImpl::RunTasksAfterWaiting() {
{
// Wait for the lock. If there are no current tasks, we will wait until
// a delayed task is ready or a task gets added to the queue.
@@ -105,6 +93,10 @@
running_tasks_.swap(tasks_);
}
+ RunRunnableTasks();
+}
+
+void TaskRunnerImpl::RunRunnableTasks() {
OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
for (TaskWithMetadata& running_task : running_tasks_) {
// Move the task to the stack so that its bound state is freed immediately
@@ -115,14 +107,6 @@
running_tasks_.clear();
}
-void TaskRunnerImpl::RunTasksUntilStopped() {
- while (is_running_) {
- ScheduleDelayedTasks();
- RunCurrentTasksBlocking();
- }
- task_runner_thread_id_ = std::thread::id();
-}
-
void TaskRunnerImpl::ScheduleDelayedTasks() {
std::lock_guard<std::mutex> lock(task_mutex_);
@@ -149,14 +133,6 @@
}
std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
- // These checks are redundant, as they are a subset of predicates in the
- // below wait predicate. However, this is more readable and a slight
- // optimization, as we don't need to take a lock if we aren't running.
- if (!is_running_) {
- OSP_DVLOG << "TaskRunnerImpl not running. Returning empty lock.";
- return {};
- }
-
std::unique_lock<std::mutex> lock(task_mutex_);
if (!tasks_.empty()) {
OSP_DVLOG << "TaskRunnerImpl lock acquired.";
@@ -179,19 +155,16 @@
} while (!ShouldWakeUpRunLoop());
} else {
// Pass a wait predicate to avoid lost or spurious wakeups.
+ const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
if (!delayed_tasks_.empty()) {
// We don't have any work to do currently, but have some in the
// pipe.
- const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
run_loop_wakeup_.wait_for(lock,
delayed_tasks_.begin()->first - now_function_(),
wait_predicate);
} else {
// We don't have any work queued.
- const auto wait_predicate = [this] {
- return !delayed_tasks_.empty() || ShouldWakeUpRunLoop();
- };
OSP_DVLOG << "TaskRunnerImpl waiting for lock...";
run_loop_wakeup_.wait(lock, wait_predicate);
}