blob: 94848f3c349fa6748347dc5a461bcbee44c2f62b [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
13#include <fcntl.h>
14#include <string.h>
15#include <unistd.h>
16
17#include "base/third_party/libevent/event.h"
18#include "webrtc/base/checks.h"
19#include "webrtc/base/logging.h"
20#include "webrtc/base/task_queue_posix.h"
21#include "webrtc/base/timeutils.h"
22
23namespace rtc {
24using internal::GetQueuePtrTls;
25using internal::AutoSetCurrentQueuePtr;
26
27namespace {
28static const char kQuit = 1;
29static const char kRunTask = 2;
30
31struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); }
35 event ev;
36 std::unique_ptr<QueuedTask> task;
37};
38
39bool SetNonBlocking(int fd) {
40 const int flags = fcntl(fd, F_GETFL);
41 RTC_CHECK(flags != -1);
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
43}
44} // namespace
45
46struct TaskQueue::QueueContext {
47 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
48 TaskQueue* queue;
49 bool is_active;
50 // Holds a list of events pending timers for cleanup when the loop exits.
51 std::list<TimerEvent*> pending_timers_;
52};
53
54class TaskQueue::PostAndReplyTask : public QueuedTask {
55 public:
56 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
57 std::unique_ptr<QueuedTask> reply,
58 TaskQueue* reply_queue)
59 : task_(std::move(task)),
60 reply_(std::move(reply)),
61 reply_queue_(reply_queue) {
62 reply_queue->PrepareReplyTask(this);
63 }
64
65 ~PostAndReplyTask() override {
66 CritScope lock(&lock_);
67 if (reply_queue_)
68 reply_queue_->ReplyTaskDone(this);
69 }
70
71 void OnReplyQueueGone() {
72 CritScope lock(&lock_);
73 reply_queue_ = nullptr;
74 }
75
76 private:
77 bool Run() override {
78 if (!task_->Run())
79 task_.release();
80
81 CritScope lock(&lock_);
82 if (reply_queue_)
83 reply_queue_->PostTask(std::move(reply_));
84 return true;
85 }
86
87 CriticalSection lock_;
88 std::unique_ptr<QueuedTask> task_;
89 std::unique_ptr<QueuedTask> reply_;
90 TaskQueue* reply_queue_ GUARDED_BY(lock_);
91};
92
93class TaskQueue::SetTimerTask : public QueuedTask {
94 public:
95 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
96 : task_(std::move(task)),
97 milliseconds_(milliseconds),
98 posted_(Time32()) {}
99
100 private:
101 bool Run() override {
102 // Compensate for the time that has passed since construction
103 // and until we got here.
104 uint32_t post_time = Time32() - posted_;
105 TaskQueue::Current()->PostDelayedTask(
106 std::move(task_),
107 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
108 return true;
109 }
110
111 std::unique_ptr<QueuedTask> task_;
112 const uint32_t milliseconds_;
113 const uint32_t posted_;
114};
115
116TaskQueue::TaskQueue(const char* queue_name)
117 : event_base_(event_base_new()),
118 wakeup_event_(new event()),
119 thread_(&TaskQueue::ThreadMain, this, queue_name) {
120 RTC_DCHECK(queue_name);
121 int fds[2];
122 RTC_CHECK(pipe(fds) == 0);
123 SetNonBlocking(fds[0]);
124 SetNonBlocking(fds[1]);
125 wakeup_pipe_out_ = fds[0];
126 wakeup_pipe_in_ = fds[1];
tommi7b020da2016-07-13 00:56:40 -0700127 // TODO(tommi): This is a hack to support two versions of libevent that we're
128 // compatible with. The method we really want to call is event_assign(),
129 // since event_set() has been marked as deprecated (and doesn't accept
130 // passing event_base__ as a parameter). However, the version of libevent
131 // that we have in Chromium, doesn't have event_assign(), so we need to call
132 // event_set() there.
133#if defined(_EVENT2_EVENT_H_)
134 event_assign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
135 EV_READ | EV_PERSIST, OnWakeup, this);
136#else
tommic06b1332016-05-14 11:31:40 -0700137 event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
138 OnWakeup, this);
tommi7b020da2016-07-13 00:56:40 -0700139#endif
tommic06b1332016-05-14 11:31:40 -0700140 event_base_set(event_base_, wakeup_event_.get());
141 event_add(wakeup_event_.get(), 0);
142 thread_.Start();
143}
144
145TaskQueue::~TaskQueue() {
146 RTC_DCHECK(!IsCurrent());
147 struct timespec ts;
148 char message = kQuit;
149 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
150 // The queue is full, so we have no choice but to wait and retry.
151 RTC_CHECK_EQ(EAGAIN, errno);
152 ts.tv_sec = 0;
153 ts.tv_nsec = 1000000;
154 nanosleep(&ts, nullptr);
155 }
156
157 thread_.Stop();
158
159 event_del(wakeup_event_.get());
160 close(wakeup_pipe_in_);
161 close(wakeup_pipe_out_);
162 wakeup_pipe_in_ = -1;
163 wakeup_pipe_out_ = -1;
164
165 {
166 // Synchronize against any pending reply tasks that might be running on
167 // other queues.
168 CritScope lock(&pending_lock_);
169 for (auto* reply : pending_replies_)
170 reply->OnReplyQueueGone();
171 pending_replies_.clear();
172 }
173
174 event_base_free(event_base_);
175}
176
177// static
178TaskQueue* TaskQueue::Current() {
179 QueueContext* ctx =
180 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
181 return ctx ? ctx->queue : nullptr;
182}
183
184// static
185bool TaskQueue::IsCurrent(const char* queue_name) {
186 TaskQueue* current = Current();
187 return current && current->thread_.name().compare(queue_name) == 0;
188}
189
190bool TaskQueue::IsCurrent() const {
191 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
192}
193
194void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
195 RTC_DCHECK(task.get());
196 // libevent isn't thread safe. This means that we can't use methods such
197 // as event_base_once to post tasks to the worker thread from a different
198 // thread. However, we can use it when posting from the worker thread itself.
199 if (IsCurrent()) {
200 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
201 task.get(), nullptr) == 0) {
202 task.release();
203 }
204 } else {
205 QueuedTask* task_id = task.get(); // Only used for comparison.
206 {
207 CritScope lock(&pending_lock_);
208 pending_.push_back(std::move(task));
209 }
210 char message = kRunTask;
211 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
212 LOG(WARNING) << "Failed to queue task.";
213 CritScope lock(&pending_lock_);
214 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
215 return t.get() == task_id;
216 });
217 }
218 }
219}
220
221void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
222 uint32_t milliseconds) {
223 if (IsCurrent()) {
224 TimerEvent* timer = new TimerEvent(std::move(task));
225 evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
226 event_base_set(event_base_, &timer->ev);
227 QueueContext* ctx =
228 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
229 ctx->pending_timers_.push_back(timer);
230 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
231 event_add(&timer->ev, &tv);
232 } else {
233 PostTask(std::unique_ptr<QueuedTask>(
234 new SetTimerTask(std::move(task), milliseconds)));
235 }
236}
237
238void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
239 std::unique_ptr<QueuedTask> reply,
240 TaskQueue* reply_queue) {
241 std::unique_ptr<QueuedTask> wrapper_task(
242 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
243 PostTask(std::move(wrapper_task));
244}
245
246void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
247 std::unique_ptr<QueuedTask> reply) {
248 return PostTaskAndReply(std::move(task), std::move(reply), Current());
249}
250
251// static
252bool TaskQueue::ThreadMain(void* context) {
253 TaskQueue* me = static_cast<TaskQueue*>(context);
254
255 QueueContext queue_context(me);
256 pthread_setspecific(GetQueuePtrTls(), &queue_context);
257
258 while (queue_context.is_active)
259 event_base_loop(me->event_base_, 0);
260
261 pthread_setspecific(GetQueuePtrTls(), nullptr);
262
263 for (TimerEvent* timer : queue_context.pending_timers_)
264 delete timer;
265
266 return false;
267}
268
269// static
270void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
271 QueueContext* ctx =
272 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
273 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
274 char buf;
275 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
276 switch (buf) {
277 case kQuit:
278 ctx->is_active = false;
279 event_base_loopbreak(ctx->queue->event_base_);
280 break;
281 case kRunTask: {
282 std::unique_ptr<QueuedTask> task;
283 {
284 CritScope lock(&ctx->queue->pending_lock_);
285 RTC_DCHECK(!ctx->queue->pending_.empty());
286 task = std::move(ctx->queue->pending_.front());
287 ctx->queue->pending_.pop_front();
288 RTC_DCHECK(task.get());
289 }
290 if (!task->Run())
291 task.release();
292 break;
293 }
294 default:
295 RTC_NOTREACHED();
296 break;
297 }
298}
299
300// static
301void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
302 auto* task = static_cast<QueuedTask*>(context);
303 if (task->Run())
304 delete task;
305}
306
307// static
308void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
309 TimerEvent* timer = static_cast<TimerEvent*>(context);
310 if (!timer->task->Run())
311 timer->task.release();
312 QueueContext* ctx =
313 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
314 ctx->pending_timers_.remove(timer);
315 delete timer;
316}
317
318void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
319 RTC_DCHECK(reply_task);
320 CritScope lock(&pending_lock_);
321 pending_replies_.push_back(reply_task);
322}
323
324void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
325 CritScope lock(&pending_lock_);
326 pending_replies_.remove(reply_task);
327}
328
329} // namespace rtc