Apply add'l feedback to the TaskRunner
Peter was able to add additional feedback to the initial TaskRunner
patch that wasn't addressed before check-in, so submitting it as a
separate patch here. This patch mostly fixes up comments, and changes
syntax in a few places.
Change-Id: Ie959a2ce4e47a23e9a1cb7d2da25511071c34f12
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1556386
Reviewed-by: Peter Thatcher <pthatcher@google.com>
Commit-Queue: Jordan Bayles <jophba@chromium.org>
diff --git a/api/impl/task_runner_impl.cc b/api/impl/task_runner_impl.cc
index 1aa5b33..ad061c2 100644
--- a/api/impl/task_runner_impl.cc
+++ b/api/impl/task_runner_impl.cc
@@ -11,9 +11,7 @@
TaskRunnerImpl::TaskRunnerImpl(platform::ClockNowFunctionPtr now_function)
: now_function_(now_function), is_running_(false) {}
-TaskRunnerImpl::~TaskRunnerImpl() {
- RequestStopSoon();
-}
+TaskRunnerImpl::~TaskRunnerImpl() = default;
void TaskRunnerImpl::PostTask(Task task) {
std::lock_guard<std::mutex> lock(task_mutex_);
@@ -50,10 +48,13 @@
void TaskRunnerImpl::RunCurrentTasksForTesting() {
std::deque<Task> current_tasks;
-
- std::unique_lock<std::mutex> lock(task_mutex_);
- tasks_.swap(current_tasks);
- lock.unlock();
+ {
+ // Unlike in the RunCurrentTasksBlocking method, here we just immediately
+ // take the lock and drain the tasks_ queue. This allows tests to avoid
+ // having to do any multithreading to interact with the queue.
+ std::unique_lock<std::mutex> lock(task_mutex_);
+ tasks_.swap(current_tasks);
+ }
for (Task& task : current_tasks) {
task();
@@ -62,10 +63,16 @@
void TaskRunnerImpl::RunCurrentTasksBlocking() {
std::deque<Task> current_tasks;
+ {
+ // Wait for the lock. If there are no current tasks, we will wait until
+ // a delayed task is ready or a task gets added to the queue.
+ auto lock = WaitForWorkAndAcquireLock();
+ if (!lock) {
+ return;
+ }
- auto lock = WaitForWorkAndAcquireLock();
- tasks_.swap(current_tasks);
- lock.unlock();
+ tasks_.swap(current_tasks);
+ }
for (Task& task : current_tasks) {
task();
@@ -85,27 +92,39 @@
// Getting the time can be expensive on some platforms, so only get it once.
const auto current_time = now_function_();
while (!delayed_tasks_.empty() &&
- (delayed_tasks_.top().time_runnable_after < current_time)) {
+ (delayed_tasks_.top().runnable_after < current_time)) {
tasks_.push_back(std::move(delayed_tasks_.top().task));
delayed_tasks_.pop();
}
}
std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
+ // These checks are redundant, as they are a subset of predicates in the
+ // below wait predicate. However, this is more readable and a slight
+ // optimization, as we don't need to take a lock if we aren't running.
+ if (!is_running_) {
+ return {};
+ }
+
std::unique_lock<std::mutex> lock(task_mutex_);
+ if (!tasks_.empty()) {
+ return lock;
+ }
// Pass a wait predicate to avoid lost or spurious wakeups.
const auto wait_predicate = [this] {
// Either we got woken up because we aren't running
// (probably just to end the thread), or we are running and have tasks to
- // do.
+ // do or schedule.
return !this->is_running_ || !this->tasks_.empty() ||
!this->delayed_tasks_.empty();
};
+ // TODO(jophba): find a predicate method that is compatible with our
+ // fake clock, for easier use with testing.
// We don't have any work to do currently, but know we have some in the pipe.
if (!delayed_tasks_.empty()) {
- run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().time_runnable_after,
+ run_loop_wakeup_.wait_until(lock, delayed_tasks_.top().runnable_after,
wait_predicate);
// We don't have any work queued.