blob: c80258811fca2ca88e1c0cfbd80ecaba1866784b [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
13#include <fcntl.h>
14#include <string.h>
15#include <unistd.h>
16
17#include "base/third_party/libevent/event.h"
18#include "webrtc/base/checks.h"
19#include "webrtc/base/logging.h"
20#include "webrtc/base/task_queue_posix.h"
21#include "webrtc/base/timeutils.h"
22
23namespace rtc {
24using internal::GetQueuePtrTls;
25using internal::AutoSetCurrentQueuePtr;
26
27namespace {
28static const char kQuit = 1;
29static const char kRunTask = 2;
30
31struct TimerEvent {
32 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 : task(std::move(task)) {}
34 ~TimerEvent() { event_del(&ev); }
35 event ev;
36 std::unique_ptr<QueuedTask> task;
37};
38
39bool SetNonBlocking(int fd) {
40 const int flags = fcntl(fd, F_GETFL);
41 RTC_CHECK(flags != -1);
42 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
43}
tommi1666b612016-07-13 10:58:12 -070044
45// TODO(tommi): This is a hack to support two versions of libevent that we're
46// compatible with. The method we really want to call is event_assign(),
47// since event_set() has been marked as deprecated (and doesn't accept
48// passing event_base__ as a parameter). However, the version of libevent
49// that we have in Chromium, doesn't have event_assign(), so we need to call
50// event_set() there.
51void EventAssign(struct event* ev,
52 struct event_base* base,
53 int fd,
54 short events,
55 void (*callback)(int, short, void*),
56 void* arg) {
57#if defined(_EVENT2_EVENT_H_)
58 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
59#else
60 event_set(ev, fd, events, callback, arg);
61 RTC_CHECK_EQ(0, event_base_set(base, ev));
62#endif
63}
tommic06b1332016-05-14 11:31:40 -070064} // namespace
65
66struct TaskQueue::QueueContext {
67 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
68 TaskQueue* queue;
69 bool is_active;
70 // Holds a list of events pending timers for cleanup when the loop exits.
71 std::list<TimerEvent*> pending_timers_;
72};
73
74class TaskQueue::PostAndReplyTask : public QueuedTask {
75 public:
76 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
77 std::unique_ptr<QueuedTask> reply,
78 TaskQueue* reply_queue)
79 : task_(std::move(task)),
80 reply_(std::move(reply)),
81 reply_queue_(reply_queue) {
82 reply_queue->PrepareReplyTask(this);
83 }
84
85 ~PostAndReplyTask() override {
86 CritScope lock(&lock_);
87 if (reply_queue_)
88 reply_queue_->ReplyTaskDone(this);
89 }
90
91 void OnReplyQueueGone() {
92 CritScope lock(&lock_);
93 reply_queue_ = nullptr;
94 }
95
96 private:
97 bool Run() override {
98 if (!task_->Run())
99 task_.release();
100
101 CritScope lock(&lock_);
102 if (reply_queue_)
103 reply_queue_->PostTask(std::move(reply_));
104 return true;
105 }
106
107 CriticalSection lock_;
108 std::unique_ptr<QueuedTask> task_;
109 std::unique_ptr<QueuedTask> reply_;
110 TaskQueue* reply_queue_ GUARDED_BY(lock_);
111};
112
113class TaskQueue::SetTimerTask : public QueuedTask {
114 public:
115 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
116 : task_(std::move(task)),
117 milliseconds_(milliseconds),
118 posted_(Time32()) {}
119
120 private:
121 bool Run() override {
122 // Compensate for the time that has passed since construction
123 // and until we got here.
124 uint32_t post_time = Time32() - posted_;
125 TaskQueue::Current()->PostDelayedTask(
126 std::move(task_),
127 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
128 return true;
129 }
130
131 std::unique_ptr<QueuedTask> task_;
132 const uint32_t milliseconds_;
133 const uint32_t posted_;
134};
135
136TaskQueue::TaskQueue(const char* queue_name)
137 : event_base_(event_base_new()),
138 wakeup_event_(new event()),
139 thread_(&TaskQueue::ThreadMain, this, queue_name) {
140 RTC_DCHECK(queue_name);
141 int fds[2];
142 RTC_CHECK(pipe(fds) == 0);
143 SetNonBlocking(fds[0]);
144 SetNonBlocking(fds[1]);
145 wakeup_pipe_out_ = fds[0];
146 wakeup_pipe_in_ = fds[1];
tommi1666b612016-07-13 10:58:12 -0700147 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
148 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700149 event_add(wakeup_event_.get(), 0);
150 thread_.Start();
151}
152
153TaskQueue::~TaskQueue() {
154 RTC_DCHECK(!IsCurrent());
155 struct timespec ts;
156 char message = kQuit;
157 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
158 // The queue is full, so we have no choice but to wait and retry.
159 RTC_CHECK_EQ(EAGAIN, errno);
160 ts.tv_sec = 0;
161 ts.tv_nsec = 1000000;
162 nanosleep(&ts, nullptr);
163 }
164
165 thread_.Stop();
166
167 event_del(wakeup_event_.get());
168 close(wakeup_pipe_in_);
169 close(wakeup_pipe_out_);
170 wakeup_pipe_in_ = -1;
171 wakeup_pipe_out_ = -1;
172
173 {
174 // Synchronize against any pending reply tasks that might be running on
175 // other queues.
176 CritScope lock(&pending_lock_);
177 for (auto* reply : pending_replies_)
178 reply->OnReplyQueueGone();
179 pending_replies_.clear();
180 }
181
182 event_base_free(event_base_);
183}
184
185// static
186TaskQueue* TaskQueue::Current() {
187 QueueContext* ctx =
188 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
189 return ctx ? ctx->queue : nullptr;
190}
191
192// static
193bool TaskQueue::IsCurrent(const char* queue_name) {
194 TaskQueue* current = Current();
195 return current && current->thread_.name().compare(queue_name) == 0;
196}
197
198bool TaskQueue::IsCurrent() const {
199 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
200}
201
202void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
203 RTC_DCHECK(task.get());
204 // libevent isn't thread safe. This means that we can't use methods such
205 // as event_base_once to post tasks to the worker thread from a different
206 // thread. However, we can use it when posting from the worker thread itself.
207 if (IsCurrent()) {
208 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
209 task.get(), nullptr) == 0) {
210 task.release();
211 }
212 } else {
213 QueuedTask* task_id = task.get(); // Only used for comparison.
214 {
215 CritScope lock(&pending_lock_);
216 pending_.push_back(std::move(task));
217 }
218 char message = kRunTask;
219 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
220 LOG(WARNING) << "Failed to queue task.";
221 CritScope lock(&pending_lock_);
222 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
223 return t.get() == task_id;
224 });
225 }
226 }
227}
228
229void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
230 uint32_t milliseconds) {
231 if (IsCurrent()) {
232 TimerEvent* timer = new TimerEvent(std::move(task));
tommi1666b612016-07-13 10:58:12 -0700233 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
tommic06b1332016-05-14 11:31:40 -0700234 QueueContext* ctx =
235 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
236 ctx->pending_timers_.push_back(timer);
237 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
238 event_add(&timer->ev, &tv);
239 } else {
240 PostTask(std::unique_ptr<QueuedTask>(
241 new SetTimerTask(std::move(task), milliseconds)));
242 }
243}
244
245void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
246 std::unique_ptr<QueuedTask> reply,
247 TaskQueue* reply_queue) {
248 std::unique_ptr<QueuedTask> wrapper_task(
249 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
250 PostTask(std::move(wrapper_task));
251}
252
253void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
254 std::unique_ptr<QueuedTask> reply) {
255 return PostTaskAndReply(std::move(task), std::move(reply), Current());
256}
257
258// static
tommi0f8b4032017-02-22 11:22:05 -0800259void TaskQueue::ThreadMain(void* context) {
tommic06b1332016-05-14 11:31:40 -0700260 TaskQueue* me = static_cast<TaskQueue*>(context);
261
262 QueueContext queue_context(me);
263 pthread_setspecific(GetQueuePtrTls(), &queue_context);
264
265 while (queue_context.is_active)
266 event_base_loop(me->event_base_, 0);
267
268 pthread_setspecific(GetQueuePtrTls(), nullptr);
269
270 for (TimerEvent* timer : queue_context.pending_timers_)
271 delete timer;
tommic06b1332016-05-14 11:31:40 -0700272}
273
274// static
275void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
276 QueueContext* ctx =
277 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
278 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
279 char buf;
280 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
281 switch (buf) {
282 case kQuit:
283 ctx->is_active = false;
284 event_base_loopbreak(ctx->queue->event_base_);
285 break;
286 case kRunTask: {
287 std::unique_ptr<QueuedTask> task;
288 {
289 CritScope lock(&ctx->queue->pending_lock_);
290 RTC_DCHECK(!ctx->queue->pending_.empty());
291 task = std::move(ctx->queue->pending_.front());
292 ctx->queue->pending_.pop_front();
293 RTC_DCHECK(task.get());
294 }
295 if (!task->Run())
296 task.release();
297 break;
298 }
299 default:
300 RTC_NOTREACHED();
301 break;
302 }
303}
304
305// static
306void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
307 auto* task = static_cast<QueuedTask*>(context);
308 if (task->Run())
309 delete task;
310}
311
312// static
313void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
314 TimerEvent* timer = static_cast<TimerEvent*>(context);
315 if (!timer->task->Run())
316 timer->task.release();
317 QueueContext* ctx =
318 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
319 ctx->pending_timers_.remove(timer);
320 delete timer;
321}
322
323void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
324 RTC_DCHECK(reply_task);
325 CritScope lock(&pending_lock_);
326 pending_replies_.push_back(reply_task);
327}
328
329void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
330 CritScope lock(&pending_lock_);
331 pending_replies_.remove(reply_task);
332}
333
334} // namespace rtc