blob: a59b450828c7b070717579f1a26b5bbda835cfd4 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
13#include <fcntl.h>
14#include <string.h>
15#include <unistd.h>
16
17#include "base/third_party/libevent/event.h"
18#include "webrtc/base/checks.h"
19#include "webrtc/base/logging.h"
20#include "webrtc/base/task_queue_posix.h"
21#include "webrtc/base/timeutils.h"
22
23namespace rtc {
24using internal::GetQueuePtrTls;
25using internal::AutoSetCurrentQueuePtr;
26
27namespace {
28static const char kQuit = 1;
29static const char kRunTask = 2;
30
31struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); }
35 event ev;
36 std::unique_ptr<QueuedTask> task;
37};
38
39bool SetNonBlocking(int fd) {
40 const int flags = fcntl(fd, F_GETFL);
41 RTC_CHECK(flags != -1);
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
43}
44} // namespace
45
46struct TaskQueue::QueueContext {
47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
48 TaskQueue* queue;
49 bool is_active;
50 // Holds a list of events pending timers for cleanup when the loop exits.
51 std::list<TimerEvent*> pending_timers_;
52};
53
54class TaskQueue::PostAndReplyTask : public QueuedTask {
55 public:
56 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
57 std::unique_ptr<QueuedTask> reply,
58 TaskQueue* reply_queue)
59 : task_(std::move(task)),
60 reply_(std::move(reply)),
61 reply_queue_(reply_queue) {
62 reply_queue->PrepareReplyTask(this);
63 }
64
65 ~PostAndReplyTask() override {
66 CritScope lock(&lock_);
67 if (reply_queue_)
68 reply_queue_->ReplyTaskDone(this);
69 }
70
71 void OnReplyQueueGone() {
72 CritScope lock(&lock_);
73 reply_queue_ = nullptr;
74 }
75
76 private:
77 bool Run() override {
78 if (!task_->Run())
79 task_.release();
80
81 CritScope lock(&lock_);
82 if (reply_queue_)
83 reply_queue_->PostTask(std::move(reply_));
84 return true;
85 }
86
87 CriticalSection lock_;
88 std::unique_ptr<QueuedTask> task_;
89 std::unique_ptr<QueuedTask> reply_;
90 TaskQueue* reply_queue_ GUARDED_BY(lock_);
91};
92
93class TaskQueue::SetTimerTask : public QueuedTask {
94 public:
95 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
96 : task_(std::move(task)),
97 milliseconds_(milliseconds),
98 posted_(Time32()) {}
99
100 private:
101 bool Run() override {
102 // Compensate for the time that has passed since construction
103 // and until we got here.
104 uint32_t post_time = Time32() - posted_;
105 TaskQueue::Current()->PostDelayedTask(
106 std::move(task_),
107 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
108 return true;
109 }
110
111 std::unique_ptr<QueuedTask> task_;
112 const uint32_t milliseconds_;
113 const uint32_t posted_;
114};
115
116TaskQueue::TaskQueue(const char* queue_name)
117 : event_base_(event_base_new()),
118 wakeup_event_(new event()),
119 thread_(&TaskQueue::ThreadMain, this, queue_name) {
120 RTC_DCHECK(queue_name);
121 int fds[2];
122 RTC_CHECK(pipe(fds) == 0);
123 SetNonBlocking(fds[0]);
124 SetNonBlocking(fds[1]);
125 wakeup_pipe_out_ = fds[0];
126 wakeup_pipe_in_ = fds[1];
127 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
128 OnWakeup, this);
129 event_base_set(event_base_, wakeup_event_.get());
130 event_add(wakeup_event_.get(), 0);
131 thread_.Start();
132}
133
134TaskQueue::~TaskQueue() {
135 RTC_DCHECK(!IsCurrent());
136 struct timespec ts;
137 char message = kQuit;
138 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
139 // The queue is full, so we have no choice but to wait and retry.
140 RTC_CHECK_EQ(EAGAIN, errno);
141 ts.tv_sec = 0;
142 ts.tv_nsec = 1000000;
143 nanosleep(&ts, nullptr);
144 }
145
146 thread_.Stop();
147
148 event_del(wakeup_event_.get());
149 close(wakeup_pipe_in_);
150 close(wakeup_pipe_out_);
151 wakeup_pipe_in_ = -1;
152 wakeup_pipe_out_ = -1;
153
154 {
155 // Synchronize against any pending reply tasks that might be running on
156 // other queues.
157 CritScope lock(&pending_lock_);
158 for (auto* reply : pending_replies_)
159 reply->OnReplyQueueGone();
160 pending_replies_.clear();
161 }
162
163 event_base_free(event_base_);
164}
165
166// static
167TaskQueue* TaskQueue::Current() {
168 QueueContext* ctx =
169 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
170 return ctx ? ctx->queue : nullptr;
171}
172
173// static
174bool TaskQueue::IsCurrent(const char* queue_name) {
175 TaskQueue* current = Current();
176 return current && current->thread_.name().compare(queue_name) == 0;
177}
178
179bool TaskQueue::IsCurrent() const {
180 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
181}
182
183void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
184 RTC_DCHECK(task.get());
185 // libevent isn't thread safe. This means that we can't use methods such
186 // as event_base_once to post tasks to the worker thread from a different
187 // thread. However, we can use it when posting from the worker thread itself.
188 if (IsCurrent()) {
189 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
190 task.get(), nullptr) == 0) {
191 task.release();
192 }
193 } else {
194 QueuedTask* task_id = task.get(); // Only used for comparison.
195 {
196 CritScope lock(&pending_lock_);
197 pending_.push_back(std::move(task));
198 }
199 char message = kRunTask;
200 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
201 LOG(WARNING) << "Failed to queue task.";
202 CritScope lock(&pending_lock_);
203 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
204 return t.get() == task_id;
205 });
206 }
207 }
208}
209
210void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
211 uint32_t milliseconds) {
212 if (IsCurrent()) {
213 TimerEvent* timer = new TimerEvent(std::move(task));
214 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
215 event_base_set(event_base_, &timer->ev);
216 QueueContext* ctx =
217 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
218 ctx->pending_timers_.push_back(timer);
219 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
220 event_add(&timer->ev, &tv);
221 } else {
222 PostTask(std::unique_ptr<QueuedTask>(
223 new SetTimerTask(std::move(task), milliseconds)));
224 }
225}
226
227void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
228 std::unique_ptr<QueuedTask> reply,
229 TaskQueue* reply_queue) {
230 std::unique_ptr<QueuedTask> wrapper_task(
231 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
232 PostTask(std::move(wrapper_task));
233}
234
235void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
236 std::unique_ptr<QueuedTask> reply) {
237 return PostTaskAndReply(std::move(task), std::move(reply), Current());
238}
239
240// static
241bool TaskQueue::ThreadMain(void* context) {
242 TaskQueue* me = static_cast<TaskQueue*>(context);
243
244 QueueContext queue_context(me);
245 pthread_setspecific(GetQueuePtrTls(), &queue_context);
246
247 while (queue_context.is_active)
248 event_base_loop(me->event_base_, 0);
249
250 pthread_setspecific(GetQueuePtrTls(), nullptr);
251
252 for (TimerEvent* timer : queue_context.pending_timers_)
253 delete timer;
254
255 return false;
256}
257
258// static
259void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
260 QueueContext* ctx =
261 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
262 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
263 char buf;
264 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
265 switch (buf) {
266 case kQuit:
267 ctx->is_active = false;
268 event_base_loopbreak(ctx->queue->event_base_);
269 break;
270 case kRunTask: {
271 std::unique_ptr<QueuedTask> task;
272 {
273 CritScope lock(&ctx->queue->pending_lock_);
274 RTC_DCHECK(!ctx->queue->pending_.empty());
275 task = std::move(ctx->queue->pending_.front());
276 ctx->queue->pending_.pop_front();
277 RTC_DCHECK(task.get());
278 }
279 if (!task->Run())
280 task.release();
281 break;
282 }
283 default:
284 RTC_NOTREACHED();
285 break;
286 }
287}
288
289// static
290void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
291 auto* task = static_cast<QueuedTask*>(context);
292 if (task->Run())
293 delete task;
294}
295
296// static
297void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
298 TimerEvent* timer = static_cast<TimerEvent*>(context);
299 if (!timer->task->Run())
300 timer->task.release();
301 QueueContext* ctx =
302 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
303 ctx->pending_timers_.remove(timer);
304 delete timer;
305}
306
307void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
308 RTC_DCHECK(reply_task);
309 CritScope lock(&pending_lock_);
310 pending_replies_.push_back(reply_task);
311}
312
313void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
314 CritScope lock(&pending_lock_);
315 pending_replies_.remove(reply_task);
316}
317
318} // namespace rtc