TaskRunner: Replace use of std::function with std::packaged_task.
The task runner is meant to run a "task" once. Thus, client code should
expect that it can std::bind() with movable types (e.g.,
std::unique_ptr<T>) as arguments. However, the TaskRunner uses
std::function, which only works when all std::bind() arguments are
copyable.
The solution is to replace std::function with std::packaged_task, the
latter of which is purposely designed to be a run-once variant of
std::function; and it also supports move-only types in the bind state.
This change also cleans-up a few other details around the switch:
1. std::packaged_task uses an explicit ctor, whereas std::function
provided an implicit one. Thus, TaskRunner::PostTask() has become a
templated member function, an inline wrapper around calling the
std::packaged_task ctor explicitly.
2. TaskRunnerImpl's use of std::priority_queue for delayed tasks was
replaced with std::multimap, because the API of the former did not
provide non-const access to its elements (which was required for move
operations).
3. Minor tweaks around comments and resource/allocation concerns in
touched code.
Change-Id: I5fe9a2e3a3c2d8d69b564575f81c4a7f4a80b265
Reviewed-on: https://chromium-review.googlesource.com/c/openscreen/+/1646564
Commit-Queue: Yuri Wiitala <miu@chromium.org>
Reviewed-by: Ryan Keane <rwkeane@google.com>
Reviewed-by: Max Yakimakha <yakimakha@chromium.org>
diff --git a/platform/base/task_runner_impl.cc b/platform/base/task_runner_impl.cc
index 6c36315..9ac1c78 100644
--- a/platform/base/task_runner_impl.cc
+++ b/platform/base/task_runner_impl.cc
@@ -19,7 +19,7 @@
TaskRunnerImpl::~TaskRunnerImpl() = default;
-void TaskRunnerImpl::PostTask(Task task) {
+void TaskRunnerImpl::PostPackagedTask(Task task) {
std::lock_guard<std::mutex> lock(task_mutex_);
tasks_.push_back(std::move(task));
if (task_waiter_) {
@@ -29,9 +29,11 @@
}
}
-void TaskRunnerImpl::PostTaskWithDelay(Task task, Clock::duration delay) {
+void TaskRunnerImpl::PostPackagedTaskWithDelay(Task task,
+ Clock::duration delay) {
std::lock_guard<std::mutex> lock(task_mutex_);
- delayed_tasks_.emplace(std::move(task), now_function_() + delay);
+ delayed_tasks_.emplace(
+ std::make_pair(now_function_() + delay, std::move(task)));
if (task_waiter_) {
task_waiter_->OnTaskPosted();
} else {
@@ -66,22 +68,24 @@
}
void TaskRunnerImpl::RunCurrentTasksForTesting() {
- std::deque<Task> current_tasks;
{
// Unlike in the RunCurrentTasksBlocking method, here we just immediately
// take the lock and drain the tasks_ queue. This allows tests to avoid
// having to do any multithreading to interact with the queue.
std::unique_lock<std::mutex> lock(task_mutex_);
- tasks_.swap(current_tasks);
+ OSP_DCHECK(running_tasks_.empty());
+ running_tasks_.swap(tasks_);
}
- for (Task& task : current_tasks) {
- task();
+ for (Task& task : running_tasks_) {
+ // Move the task to the stack so that its bound state is freed immediately
+ // after being run.
+ std::move(task)();
}
+ running_tasks_.clear();
}
void TaskRunnerImpl::RunCurrentTasksBlocking() {
- std::deque<Task> current_tasks;
{
// Wait for the lock. If there are no current tasks, we will wait until
// a delayed task is ready or a task gets added to the queue.
@@ -90,13 +94,17 @@
return;
}
- tasks_.swap(current_tasks);
+ OSP_DCHECK(running_tasks_.empty());
+ running_tasks_.swap(tasks_);
}
- for (Task& task : current_tasks) {
- OSP_DVLOG << "Running " << current_tasks.size() << " current tasks...";
- task();
+ OSP_DVLOG << "Running " << running_tasks_.size() << " tasks...";
+ for (Task& task : running_tasks_) {
+ // Move the task to the stack so that its bound state is freed immediately
+ // after being run.
+ std::move(task)();
}
+ running_tasks_.clear();
}
void TaskRunnerImpl::RunTasksUntilStopped() {
@@ -111,11 +119,11 @@
// Getting the time can be expensive on some platforms, so only get it once.
const auto current_time = now_function_();
- while (!delayed_tasks_.empty() &&
- (delayed_tasks_.top().runnable_after <= current_time)) {
- tasks_.push_back(std::move(delayed_tasks_.top().task));
- delayed_tasks_.pop();
+ const auto end_of_range = delayed_tasks_.upper_bound(current_time);
+ for (auto it = delayed_tasks_.begin(); it != end_of_range; ++it) {
+ tasks_.push_back(std::move(it->second));
}
+ delayed_tasks_.erase(delayed_tasks_.begin(), end_of_range);
}
bool TaskRunnerImpl::ShouldWakeUpRunLoop() {
@@ -128,7 +136,7 @@
}
return !delayed_tasks_.empty() &&
- (delayed_tasks_.top().runnable_after <= now_function_());
+ (delayed_tasks_.begin()->first <= now_function_());
}
std::unique_lock<std::mutex> TaskRunnerImpl::WaitForWorkAndAcquireLock() {
@@ -151,7 +159,7 @@
Clock::duration timeout = waiter_timeout_;
if (!delayed_tasks_.empty()) {
Clock::duration next_task_delta =
- delayed_tasks_.top().runnable_after - now_function_();
+ delayed_tasks_.begin()->first - now_function_();
if (next_task_delta < timeout) {
timeout = next_task_delta;
}
@@ -167,9 +175,9 @@
// pipe.
const auto wait_predicate = [this] { return ShouldWakeUpRunLoop(); };
OSP_DVLOG << "TaskRunner waiting for lock until delayed task ready...";
- run_loop_wakeup_.wait_for(
- lock, delayed_tasks_.top().runnable_after - now_function_(),
- wait_predicate);
+ run_loop_wakeup_.wait_for(lock,
+ delayed_tasks_.begin()->first - now_function_(),
+ wait_predicate);
} else {
// We don't have any work queued.
const auto wait_predicate = [this] {