Migrate gcd task queue implementation to TaskQueueBase interface
Bug: webrtc:10191
Change-Id: If15138f97445484668d3e42f3a35875521c38545
Reviewed-on: https://webrtc-review.googlesource.com/c/122501
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26782}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 9809a8e..ff8af81 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -534,18 +534,18 @@
if (is_mac || is_ios) {
rtc_source_set("rtc_task_queue_gcd") {
- visibility = [ ":rtc_task_queue_impl" ]
+ visibility = [ "../api/task_queue:default_task_queue_factory_impl" ]
sources = [
"task_queue_gcd.cc",
- "task_queue_posix.cc",
- "task_queue_posix.h",
+ "task_queue_gcd.h",
]
deps = [
":checks",
":logging",
- ":refcount",
- ":rtc_task_queue_api",
- "../api:scoped_refptr",
+ "../api/task_queue",
+ "../api/task_queue:task_queue_factory",
+ "//third_party/abseil-cpp/absl/memory",
+ "//third_party/abseil-cpp/absl/strings",
]
}
}
@@ -594,17 +594,12 @@
rtc_source_set("rtc_task_queue_impl") {
visibility = [ "*" ]
- if (rtc_enable_libevent) {
+ if (rtc_enable_libevent || is_mac || is_ios) {
deps = [
"../api/task_queue:default_task_queue_factory_impl",
"../api/task_queue:global_task_queue_factory",
]
} else {
- if (is_mac || is_ios) {
- deps = [
- ":rtc_task_queue_gcd",
- ]
- }
if (is_win) {
if (current_os == "winuwp") {
deps = [
diff --git a/rtc_base/task_queue_gcd.cc b/rtc_base/task_queue_gcd.cc
index 405edab..c131d82 100644
--- a/rtc_base/task_queue_gcd.cc
+++ b/rtc_base/task_queue_gcd.cc
@@ -12,171 +12,142 @@
// The implementation uses Grand Central Dispatch queues (GCD) to
// do the actual task queuing.
-#include "rtc_base/task_queue.h"
+#include "rtc_base/task_queue_gcd.h"
#include <string.h>
#include <dispatch/dispatch.h>
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/ref_counted_object.h"
-#include "rtc_base/task_queue_posix.h"
-namespace rtc {
+namespace webrtc {
namespace {
-using Priority = TaskQueue::Priority;
-
-int TaskQueuePriorityToGCD(Priority priority) {
+int TaskQueuePriorityToGCD(TaskQueueFactory::Priority priority) {
switch (priority) {
- case Priority::NORMAL:
+ case TaskQueueFactory::Priority::NORMAL:
return DISPATCH_QUEUE_PRIORITY_DEFAULT;
- case Priority::HIGH:
+ case TaskQueueFactory::Priority::HIGH:
return DISPATCH_QUEUE_PRIORITY_HIGH;
- case Priority::LOW:
+ case TaskQueueFactory::Priority::LOW:
return DISPATCH_QUEUE_PRIORITY_LOW;
}
}
-} // namespace
-using internal::GetQueuePtrTls;
-using internal::AutoSetCurrentQueuePtr;
-
-class TaskQueue::Impl : public RefCountInterface {
+class TaskQueueGcd : public TaskQueueBase {
public:
- Impl(const char* queue_name, TaskQueue* task_queue, Priority priority);
- ~Impl() override;
+ TaskQueueGcd(absl::string_view queue_name, int gcd_priority);
- static TaskQueue* Current();
-
- // Used for DCHECKing the current queue.
- bool IsCurrent() const;
-
- void PostTask(std::unique_ptr<QueuedTask> task);
- void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+ void Delete() override;
+ void PostTask(std::unique_ptr<QueuedTask> task) override;
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) override;
private:
- struct QueueContext {
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
-
- static void SetNotActive(void* context) {
- QueueContext* qc = static_cast<QueueContext*>(context);
- qc->is_active = false;
- }
-
- static void DeleteContext(void* context) {
- QueueContext* qc = static_cast<QueueContext*>(context);
- delete qc;
- }
-
- TaskQueue* const queue;
- bool is_active;
- };
-
struct TaskContext {
- TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task)
- : queue_ctx(queue_ctx), task(std::move(task)) {}
- virtual ~TaskContext() {}
+ TaskContext(TaskQueueGcd* queue, std::unique_ptr<QueuedTask> task)
+ : queue(queue), task(std::move(task)) {}
- static void RunTask(void* context) {
- std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context));
- if (tc->queue_ctx->is_active) {
- AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue);
- if (!tc->task->Run())
- tc->task.release();
- }
- }
-
- QueueContext* const queue_ctx;
+ TaskQueueGcd* const queue;
std::unique_ptr<QueuedTask> task;
};
+ ~TaskQueueGcd() override;
+ static void RunTask(void* task_context);
+ static void SetNotActive(void* task_queue);
+ static void DeleteQueue(void* task_queue);
+
dispatch_queue_t queue_;
- QueueContext* const context_;
+ bool is_active_;
};
-TaskQueue::Impl::Impl(const char* queue_name,
- TaskQueue* task_queue,
- Priority priority)
- : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
- context_(new QueueContext(task_queue)) {
- RTC_DCHECK(queue_name);
+TaskQueueGcd::TaskQueueGcd(absl::string_view queue_name, int gcd_priority)
+ : queue_(dispatch_queue_create(std::string(queue_name).c_str(),
+ DISPATCH_QUEUE_SERIAL)),
+ is_active_(true) {
RTC_CHECK(queue_);
- dispatch_set_context(queue_, context_);
- // Assign a finalizer that will delete the context when the last reference
- // to the queue is released. This may run after the TaskQueue object has
- // been deleted.
- dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
+ dispatch_set_context(queue_, this);
+ // Assign a finalizer that will delete the queue when the last reference
+ // is released. This may run after the TaskQueue::Delete.
+ dispatch_set_finalizer_f(queue_, &DeleteQueue);
- dispatch_set_target_queue(
- queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0));
+ dispatch_set_target_queue(queue_, dispatch_get_global_queue(gcd_priority, 0));
}
-TaskQueue::Impl::~Impl() {
+TaskQueueGcd::~TaskQueueGcd() = default;
+
+void TaskQueueGcd::Delete() {
RTC_DCHECK(!IsCurrent());
// Implementation/behavioral note:
// Dispatch queues are reference counted via calls to dispatch_retain and
// dispatch_release. Pending blocks submitted to a queue also hold a
// reference to the queue until they have finished. Once all references to a
// queue have been released, the queue will be deallocated by the system.
- // This is why we check the context before running tasks.
+ // This is why we check the is_active_ before running tasks.
- // Use dispatch_sync to set the context to null to guarantee that there's not
- // a race between checking the context and using it from a task.
- dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive);
+ // Use dispatch_sync to set the is_active_ to guarantee that there's not a
+ // race with checking it from a task.
+ dispatch_sync_f(queue_, this, &SetNotActive);
dispatch_release(queue_);
}
-// static
-TaskQueue* TaskQueue::Impl::Current() {
- return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls()));
+void TaskQueueGcd::PostTask(std::unique_ptr<QueuedTask> task) {
+ auto* context = new TaskContext(this, std::move(task));
+ dispatch_async_f(queue_, context, &RunTask);
}
-bool TaskQueue::Impl::IsCurrent() const {
- RTC_DCHECK(queue_);
- const TaskQueue* current = Current();
- return current && this == current->impl_.get();
-}
-
-void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
- auto* context = new TaskContext(context_, std::move(task));
- dispatch_async_f(queue_, context, &TaskContext::RunTask);
-}
-
-void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- auto* context = new TaskContext(context_, std::move(task));
+void TaskQueueGcd::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ auto* context = new TaskContext(this, std::move(task));
dispatch_after_f(
dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_,
- context, &TaskContext::RunTask);
+ context, &RunTask);
}
-// Boilerplate for the PIMPL pattern.
-TaskQueue::TaskQueue(const char* queue_name, Priority priority)
- : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
-}
-
-TaskQueue::~TaskQueue() {}
-
// static
-TaskQueue* TaskQueue::Current() {
- return TaskQueue::Impl::Current();
+void TaskQueueGcd::RunTask(void* task_context) {
+ std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(task_context));
+ if (!tc->queue->is_active_)
+ return;
+
+ CurrentTaskQueueSetter set_current(tc->queue);
+ auto* task = tc->task.release();
+ if (task->Run()) {
+ // Delete the task before CurrentTaskQueueSetter clears state that this code
+ // is running on the task queue.
+ delete task;
+ }
}
-// Used for DCHECKing the current queue.
-bool TaskQueue::IsCurrent() const {
- return impl_->IsCurrent();
+// static
+void TaskQueueGcd::SetNotActive(void* task_queue) {
+ static_cast<TaskQueueGcd*>(task_queue)->is_active_ = false;
}
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- return TaskQueue::impl_->PostTask(std::move(task));
+// static
+void TaskQueueGcd::DeleteQueue(void* task_queue) {
+ delete static_cast<TaskQueueGcd*>(task_queue);
}
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
+class TaskQueueGcdFactory final : public TaskQueueFactory {
+ public:
+ std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+ absl::string_view name,
+ Priority priority) const override {
+ return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+ new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority)));
+ }
+};
+
+} // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory() {
+ return absl::make_unique<TaskQueueGcdFactory>();
}
-} // namespace rtc
+} // namespace webrtc
diff --git a/rtc_base/task_queue_gcd.h b/rtc_base/task_queue_gcd.h
new file mode 100644
index 0000000..dc6039e
--- /dev/null
+++ b/rtc_base/task_queue_gcd.h
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2019 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_TASK_QUEUE_GCD_H_
+#define RTC_BASE_TASK_QUEUE_GCD_H_
+
+#include <memory>
+
+#include "api/task_queue/task_queue_factory.h"
+
+namespace webrtc {
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueGcdFactory();
+
+} // namespace webrtc
+
+#endif // RTC_BASE_TASK_QUEUE_GCD_H_
diff --git a/rtc_base/task_queue_posix.cc b/rtc_base/task_queue_posix.cc
deleted file mode 100644
index 520b8e9..0000000
--- a/rtc_base/task_queue_posix.cc
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "rtc_base/task_queue_posix.h"
-
-#include "rtc_base/checks.h"
-#include "rtc_base/task_queue.h"
-
-namespace rtc {
-namespace internal {
-pthread_key_t g_queue_ptr_tls = 0;
-
-void InitializeTls() {
- RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0);
-}
-
-pthread_key_t GetQueuePtrTls() {
- static pthread_once_t init_once = PTHREAD_ONCE_INIT;
- RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0);
- return g_queue_ptr_tls;
-}
-
-AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q)
- : prev_(TaskQueue::Current()) {
- pthread_setspecific(GetQueuePtrTls(), q);
-}
-
-AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() {
- pthread_setspecific(GetQueuePtrTls(), prev_);
-}
-
-} // namespace internal
-} // namespace rtc
diff --git a/rtc_base/task_queue_posix.h b/rtc_base/task_queue_posix.h
deleted file mode 100644
index 3014e20..0000000
--- a/rtc_base/task_queue_posix.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef RTC_BASE_TASK_QUEUE_POSIX_H_
-#define RTC_BASE_TASK_QUEUE_POSIX_H_
-
-#include <pthread.h>
-
-namespace rtc {
-
-class TaskQueue;
-
-namespace internal {
-
-class AutoSetCurrentQueuePtr {
- public:
- explicit AutoSetCurrentQueuePtr(TaskQueue* q);
- ~AutoSetCurrentQueuePtr();
-
- private:
- TaskQueue* const prev_;
-};
-
-pthread_key_t GetQueuePtrTls();
-
-} // namespace internal
-} // namespace rtc
-
-#endif // RTC_BASE_TASK_QUEUE_POSIX_H_