Reland of New task queueing primitive for async tasks: TaskQueue.
New task queueing primitive for async tasks: TaskQueue.
TaskQueue is a new way to asynchronously execute tasks sequentially
in a thread safe manner with minimal locking. The implementation
uses OS supported APIs to do this that are compatible with async IO
notifications from things like sockets and files.
This class is a part of rtc_base_approved, so can be used by both
the webrtc and libjingle parts of the WebRTC library. Moving forward,
we can replace rtc::Thread and webrtc::ProcessThread with this implementation.
NOTE: It should not be assumed that all tasks that execute on a TaskQueue,
run on the same thread. E.g. on Mac and iOS, we use GCD dispatch queues
which means that tasks might execute on different threads depending on
what's the most efficient thing to do.
TBR=perkj@webrtc.org,phoglund@webrtc.org
Review-Url: https://codereview.webrtc.org/1984503002
Cr-Commit-Position: refs/heads/master@{#12749}
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
new file mode 100644
index 0000000..a59b450
--- /dev/null
+++ b/webrtc/base/task_queue_libevent.cc
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue.h"
+
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "base/third_party/libevent/event.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/base/task_queue_posix.h"
+#include "webrtc/base/timeutils.h"
+
+namespace rtc {
+using internal::GetQueuePtrTls;
+using internal::AutoSetCurrentQueuePtr;
+
+namespace {
+static const char kQuit = 1;
+static const char kRunTask = 2;
+
+struct TimerEvent {
+ explicit TimerEvent(std::unique_ptr<QueuedTask> task)
+ : task(std::move(task)) {}
+ ~TimerEvent() { event_del(&ev); }
+ event ev;
+ std::unique_ptr<QueuedTask> task;
+};
+
+bool SetNonBlocking(int fd) {
+ const int flags = fcntl(fd, F_GETFL);
+ RTC_CHECK(flags != -1);
+ return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
+}
+} // namespace
+
+struct TaskQueue::QueueContext {
+ explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
+ TaskQueue* queue;
+ bool is_active;
+ // Holds a list of events pending timers for cleanup when the loop exits.
+ std::list<TimerEvent*> pending_timers_;
+};
+
+class TaskQueue::PostAndReplyTask : public QueuedTask {
+ public:
+ PostAndReplyTask(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue)
+ : task_(std::move(task)),
+ reply_(std::move(reply)),
+ reply_queue_(reply_queue) {
+ reply_queue->PrepareReplyTask(this);
+ }
+
+ ~PostAndReplyTask() override {
+ CritScope lock(&lock_);
+ if (reply_queue_)
+ reply_queue_->ReplyTaskDone(this);
+ }
+
+ void OnReplyQueueGone() {
+ CritScope lock(&lock_);
+ reply_queue_ = nullptr;
+ }
+
+ private:
+ bool Run() override {
+ if (!task_->Run())
+ task_.release();
+
+ CritScope lock(&lock_);
+ if (reply_queue_)
+ reply_queue_->PostTask(std::move(reply_));
+ return true;
+ }
+
+ CriticalSection lock_;
+ std::unique_ptr<QueuedTask> task_;
+ std::unique_ptr<QueuedTask> reply_;
+ TaskQueue* reply_queue_ GUARDED_BY(lock_);
+};
+
+class TaskQueue::SetTimerTask : public QueuedTask {
+ public:
+ SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
+ : task_(std::move(task)),
+ milliseconds_(milliseconds),
+ posted_(Time32()) {}
+
+ private:
+ bool Run() override {
+ // Compensate for the time that has passed since construction
+ // and until we got here.
+ uint32_t post_time = Time32() - posted_;
+ TaskQueue::Current()->PostDelayedTask(
+ std::move(task_),
+ post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
+ return true;
+ }
+
+ std::unique_ptr<QueuedTask> task_;
+ const uint32_t milliseconds_;
+ const uint32_t posted_;
+};
+
+TaskQueue::TaskQueue(const char* queue_name)
+ : event_base_(event_base_new()),
+ wakeup_event_(new event()),
+ thread_(&TaskQueue::ThreadMain, this, queue_name) {
+ RTC_DCHECK(queue_name);
+ int fds[2];
+ RTC_CHECK(pipe(fds) == 0);
+ SetNonBlocking(fds[0]);
+ SetNonBlocking(fds[1]);
+ wakeup_pipe_out_ = fds[0];
+ wakeup_pipe_in_ = fds[1];
+ event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
+ OnWakeup, this);
+ event_base_set(event_base_, wakeup_event_.get());
+ event_add(wakeup_event_.get(), 0);
+ thread_.Start();
+}
+
+TaskQueue::~TaskQueue() {
+ RTC_DCHECK(!IsCurrent());
+ struct timespec ts;
+ char message = kQuit;
+ while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ // The queue is full, so we have no choice but to wait and retry.
+ RTC_CHECK_EQ(EAGAIN, errno);
+ ts.tv_sec = 0;
+ ts.tv_nsec = 1000000;
+ nanosleep(&ts, nullptr);
+ }
+
+ thread_.Stop();
+
+ event_del(wakeup_event_.get());
+ close(wakeup_pipe_in_);
+ close(wakeup_pipe_out_);
+ wakeup_pipe_in_ = -1;
+ wakeup_pipe_out_ = -1;
+
+ {
+ // Synchronize against any pending reply tasks that might be running on
+ // other queues.
+ CritScope lock(&pending_lock_);
+ for (auto* reply : pending_replies_)
+ reply->OnReplyQueueGone();
+ pending_replies_.clear();
+ }
+
+ event_base_free(event_base_);
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ return ctx ? ctx->queue : nullptr;
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+ TaskQueue* current = Current();
+ return current && current->thread_.name().compare(queue_name) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+ return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+ RTC_DCHECK(task.get());
+ // libevent isn't thread safe. This means that we can't use methods such
+ // as event_base_once to post tasks to the worker thread from a different
+ // thread. However, we can use it when posting from the worker thread itself.
+ if (IsCurrent()) {
+ if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
+ task.get(), nullptr) == 0) {
+ task.release();
+ }
+ } else {
+ QueuedTask* task_id = task.get(); // Only used for comparison.
+ {
+ CritScope lock(&pending_lock_);
+ pending_.push_back(std::move(task));
+ }
+ char message = kRunTask;
+ if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ LOG(WARNING) << "Failed to queue task.";
+ CritScope lock(&pending_lock_);
+ pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
+ return t.get() == task_id;
+ });
+ }
+ }
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ if (IsCurrent()) {
+ TimerEvent* timer = new TimerEvent(std::move(task));
+ evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
+ event_base_set(event_base_, &timer->ev);
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ ctx->pending_timers_.push_back(timer);
+ timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
+ event_add(&timer->ev, &tv);
+ } else {
+ PostTask(std::unique_ptr<QueuedTask>(
+ new SetTimerTask(std::move(task), milliseconds)));
+ }
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue) {
+ std::unique_ptr<QueuedTask> wrapper_task(
+ new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+ PostTask(std::move(wrapper_task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply) {
+ return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+// static
+bool TaskQueue::ThreadMain(void* context) {
+ TaskQueue* me = static_cast<TaskQueue*>(context);
+
+ QueueContext queue_context(me);
+ pthread_setspecific(GetQueuePtrTls(), &queue_context);
+
+ while (queue_context.is_active)
+ event_base_loop(me->event_base_, 0);
+
+ pthread_setspecific(GetQueuePtrTls(), nullptr);
+
+ for (TimerEvent* timer : queue_context.pending_timers_)
+ delete timer;
+
+ return false;
+}
+
+// static
+void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
+ char buf;
+ RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
+ switch (buf) {
+ case kQuit:
+ ctx->is_active = false;
+ event_base_loopbreak(ctx->queue->event_base_);
+ break;
+ case kRunTask: {
+ std::unique_ptr<QueuedTask> task;
+ {
+ CritScope lock(&ctx->queue->pending_lock_);
+ RTC_DCHECK(!ctx->queue->pending_.empty());
+ task = std::move(ctx->queue->pending_.front());
+ ctx->queue->pending_.pop_front();
+ RTC_DCHECK(task.get());
+ }
+ if (!task->Run())
+ task.release();
+ break;
+ }
+ default:
+ RTC_NOTREACHED();
+ break;
+ }
+}
+
+// static
+void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
+ auto* task = static_cast<QueuedTask*>(context);
+ if (task->Run())
+ delete task;
+}
+
+// static
+void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
+ TimerEvent* timer = static_cast<TimerEvent*>(context);
+ if (!timer->task->Run())
+ timer->task.release();
+ QueueContext* ctx =
+ static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+ ctx->pending_timers_.remove(timer);
+ delete timer;
+}
+
+void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+ RTC_DCHECK(reply_task);
+ CritScope lock(&pending_lock_);
+ pending_replies_.push_back(reply_task);
+}
+
+void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
+ CritScope lock(&pending_lock_);
+ pending_replies_.remove(reply_task);
+}
+
+} // namespace rtc