blob: db267dc80be893aa7715f5dea97ed2a57a93f47c [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
kjellandere96c45b2017-06-30 10:45:21 -070011#include "webrtc/rtc_base/task_queue.h"
tommic06b1332016-05-14 11:31:40 -070012
13#include <fcntl.h>
tommi8c80c6e2017-02-23 00:34:52 -080014#include <signal.h>
tommic06b1332016-05-14 11:31:40 -070015#include <string.h>
16#include <unistd.h>
17
18#include "base/third_party/libevent/event.h"
kjellandere96c45b2017-06-30 10:45:21 -070019#include "webrtc/rtc_base/checks.h"
20#include "webrtc/rtc_base/logging.h"
kwiberg5b9746e2017-08-16 04:52:35 -070021#include "webrtc/rtc_base/safe_conversions.h"
kjellandere96c45b2017-06-30 10:45:21 -070022#include "webrtc/rtc_base/task_queue_posix.h"
23#include "webrtc/rtc_base/timeutils.h"
tommic06b1332016-05-14 11:31:40 -070024
25namespace rtc {
26using internal::GetQueuePtrTls;
27using internal::AutoSetCurrentQueuePtr;
28
29namespace {
30static const char kQuit = 1;
31static const char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080032static const char kRunReplyTask = 3;
33
tommic9bb7912017-02-24 10:42:14 -080034using Priority = TaskQueue::Priority;
35
tommi8c80c6e2017-02-23 00:34:52 -080036// This ignores the SIGPIPE signal on the calling thread.
37// This signal can be fired when trying to write() to a pipe that's being
38// closed or while closing a pipe that's being written to.
39// We can run into that situation (e.g. reply tasks that don't get a chance to
40// run because the task queue is being deleted) so we ignore this signal and
41// continue as normal.
42// As a side note for this implementation, it would be great if we could safely
43// restore the sigmask, but unfortunately the operation of restoring it, can
44// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
45// The SIGPIPE signal by default causes the process to be terminated, so we
46// don't want to risk that.
47// An alternative to this approach is to ignore the signal for the whole
48// process:
49// signal(SIGPIPE, SIG_IGN);
50void IgnoreSigPipeSignalOnCurrentThread() {
51 sigset_t sigpipe_mask;
52 sigemptyset(&sigpipe_mask);
53 sigaddset(&sigpipe_mask, SIGPIPE);
54 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
55}
tommic06b1332016-05-14 11:31:40 -070056
57struct TimerEvent {
58 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
59 : task(std::move(task)) {}
60 ~TimerEvent() { event_del(&ev); }
61 event ev;
62 std::unique_ptr<QueuedTask> task;
63};
64
65bool SetNonBlocking(int fd) {
66 const int flags = fcntl(fd, F_GETFL);
67 RTC_CHECK(flags != -1);
68 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
69}
tommi1666b612016-07-13 10:58:12 -070070
71// TODO(tommi): This is a hack to support two versions of libevent that we're
72// compatible with. The method we really want to call is event_assign(),
73// since event_set() has been marked as deprecated (and doesn't accept
74// passing event_base__ as a parameter). However, the version of libevent
75// that we have in Chromium, doesn't have event_assign(), so we need to call
76// event_set() there.
77void EventAssign(struct event* ev,
78 struct event_base* base,
79 int fd,
80 short events,
81 void (*callback)(int, short, void*),
82 void* arg) {
83#if defined(_EVENT2_EVENT_H_)
84 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
85#else
86 event_set(ev, fd, events, callback, arg);
87 RTC_CHECK_EQ(0, event_base_set(base, ev));
88#endif
89}
tommic9bb7912017-02-24 10:42:14 -080090
91ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
92 switch (priority) {
93 case Priority::HIGH:
94 return kRealtimePriority;
95 case Priority::LOW:
96 return kLowPriority;
97 case Priority::NORMAL:
98 return kNormalPriority;
99 default:
100 RTC_NOTREACHED();
101 break;
102 }
103 return kNormalPriority;
104}
tommic06b1332016-05-14 11:31:40 -0700105} // namespace
106
107struct TaskQueue::QueueContext {
108 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
109 TaskQueue* queue;
110 bool is_active;
111 // Holds a list of events pending timers for cleanup when the loop exits.
112 std::list<TimerEvent*> pending_timers_;
113};
114
tommi8c80c6e2017-02-23 00:34:52 -0800115// Posting a reply task is tricky business. This class owns the reply task
116// and a reference to it is held by both the reply queue and the first task.
117// Here's an outline of what happens when dealing with a reply task.
118// * The ReplyTaskOwner owns the |reply_| task.
119// * One ref owned by PostAndReplyTask
120// * One ref owned by the reply TaskQueue
121// * ReplyTaskOwner has a flag |run_task_| initially set to false.
122// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
123// * After successfully running the original |task_|, PostAndReplyTask() calls
124// set_should_run_task(). This sets |run_task_| to true.
125// * In PostAndReplyTask's dtor:
126// * It releases its reference to ReplyTaskOwner (important to do this first).
127// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
128// * PostAndReplyTask doesn't care if write() fails, but when it does:
129// * The reply queue is gone.
130// * ReplyTaskOwner has already been deleted and the reply task too.
131// * If write() succeeds:
132// * ReplyQueue receives the kRunReplyTask message
133// * Goes through all pending tasks, finding the first that HasOneRef()
134// * Calls ReplyTaskOwner::Run()
135// * if set_should_run_task() was called, the reply task will be run
136// * Release the reference to ReplyTaskOwner
137// * ReplyTaskOwner and associated |reply_| are deleted.
138class TaskQueue::ReplyTaskOwner {
139 public:
140 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
141 : reply_(std::move(reply)) {}
142
143 void Run() {
144 RTC_DCHECK(reply_);
145 if (run_task_) {
146 if (!reply_->Run())
147 reply_.release();
148 }
149 reply_.reset();
150 }
151
152 void set_should_run_task() {
153 RTC_DCHECK(!run_task_);
154 run_task_ = true;
155 }
156
157 private:
158 std::unique_ptr<QueuedTask> reply_;
159 bool run_task_ = false;
160};
161
tommic06b1332016-05-14 11:31:40 -0700162class TaskQueue::PostAndReplyTask : public QueuedTask {
163 public:
164 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
165 std::unique_ptr<QueuedTask> reply,
tommi8c80c6e2017-02-23 00:34:52 -0800166 TaskQueue* reply_queue,
167 int reply_pipe)
tommic06b1332016-05-14 11:31:40 -0700168 : task_(std::move(task)),
tommi8c80c6e2017-02-23 00:34:52 -0800169 reply_pipe_(reply_pipe),
170 reply_task_owner_(
171 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
172 reply_queue->PrepareReplyTask(reply_task_owner_);
tommic06b1332016-05-14 11:31:40 -0700173 }
174
175 ~PostAndReplyTask() override {
tommi8c80c6e2017-02-23 00:34:52 -0800176 reply_task_owner_ = nullptr;
177 IgnoreSigPipeSignalOnCurrentThread();
178 // Send a signal to the reply queue that the reply task can run now.
179 // Depending on whether |set_should_run_task()| was called by the
180 // PostAndReplyTask(), the reply task may or may not actually run.
181 // In either case, it will be deleted.
182 char message = kRunReplyTask;
183 write(reply_pipe_, &message, sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700184 }
185
186 private:
187 bool Run() override {
188 if (!task_->Run())
189 task_.release();
tommi8c80c6e2017-02-23 00:34:52 -0800190 reply_task_owner_->set_should_run_task();
tommic06b1332016-05-14 11:31:40 -0700191 return true;
192 }
193
tommic06b1332016-05-14 11:31:40 -0700194 std::unique_ptr<QueuedTask> task_;
tommi8c80c6e2017-02-23 00:34:52 -0800195 int reply_pipe_;
196 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
tommic06b1332016-05-14 11:31:40 -0700197};
198
199class TaskQueue::SetTimerTask : public QueuedTask {
200 public:
201 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
202 : task_(std::move(task)),
203 milliseconds_(milliseconds),
204 posted_(Time32()) {}
205
206 private:
207 bool Run() override {
208 // Compensate for the time that has passed since construction
209 // and until we got here.
210 uint32_t post_time = Time32() - posted_;
211 TaskQueue::Current()->PostDelayedTask(
212 std::move(task_),
213 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
214 return true;
215 }
216
217 std::unique_ptr<QueuedTask> task_;
218 const uint32_t milliseconds_;
219 const uint32_t posted_;
220};
221
tommic9bb7912017-02-24 10:42:14 -0800222TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
tommic06b1332016-05-14 11:31:40 -0700223 : event_base_(event_base_new()),
224 wakeup_event_(new event()),
tommic9bb7912017-02-24 10:42:14 -0800225 thread_(&TaskQueue::ThreadMain,
226 this,
227 queue_name,
228 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700229 RTC_DCHECK(queue_name);
230 int fds[2];
231 RTC_CHECK(pipe(fds) == 0);
232 SetNonBlocking(fds[0]);
233 SetNonBlocking(fds[1]);
234 wakeup_pipe_out_ = fds[0];
235 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800236
tommi1666b612016-07-13 10:58:12 -0700237 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
238 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700239 event_add(wakeup_event_.get(), 0);
240 thread_.Start();
241}
242
243TaskQueue::~TaskQueue() {
244 RTC_DCHECK(!IsCurrent());
245 struct timespec ts;
246 char message = kQuit;
247 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
248 // The queue is full, so we have no choice but to wait and retry.
249 RTC_CHECK_EQ(EAGAIN, errno);
250 ts.tv_sec = 0;
251 ts.tv_nsec = 1000000;
252 nanosleep(&ts, nullptr);
253 }
254
255 thread_.Stop();
256
257 event_del(wakeup_event_.get());
tommi8c80c6e2017-02-23 00:34:52 -0800258
259 IgnoreSigPipeSignalOnCurrentThread();
260
tommic06b1332016-05-14 11:31:40 -0700261 close(wakeup_pipe_in_);
262 close(wakeup_pipe_out_);
263 wakeup_pipe_in_ = -1;
264 wakeup_pipe_out_ = -1;
265
tommic06b1332016-05-14 11:31:40 -0700266 event_base_free(event_base_);
267}
268
269// static
270TaskQueue* TaskQueue::Current() {
271 QueueContext* ctx =
272 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
273 return ctx ? ctx->queue : nullptr;
274}
275
276// static
277bool TaskQueue::IsCurrent(const char* queue_name) {
278 TaskQueue* current = Current();
279 return current && current->thread_.name().compare(queue_name) == 0;
280}
281
282bool TaskQueue::IsCurrent() const {
283 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
284}
285
286void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
287 RTC_DCHECK(task.get());
288 // libevent isn't thread safe. This means that we can't use methods such
289 // as event_base_once to post tasks to the worker thread from a different
290 // thread. However, we can use it when posting from the worker thread itself.
291 if (IsCurrent()) {
292 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
293 task.get(), nullptr) == 0) {
294 task.release();
295 }
296 } else {
297 QueuedTask* task_id = task.get(); // Only used for comparison.
298 {
299 CritScope lock(&pending_lock_);
300 pending_.push_back(std::move(task));
301 }
302 char message = kRunTask;
303 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
304 LOG(WARNING) << "Failed to queue task.";
305 CritScope lock(&pending_lock_);
306 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
307 return t.get() == task_id;
308 });
309 }
310 }
311}
312
313void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
314 uint32_t milliseconds) {
315 if (IsCurrent()) {
316 TimerEvent* timer = new TimerEvent(std::move(task));
tommi1666b612016-07-13 10:58:12 -0700317 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
tommic06b1332016-05-14 11:31:40 -0700318 QueueContext* ctx =
319 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
320 ctx->pending_timers_.push_back(timer);
kwiberg5b9746e2017-08-16 04:52:35 -0700321 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
322 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
tommic06b1332016-05-14 11:31:40 -0700323 event_add(&timer->ev, &tv);
324 } else {
325 PostTask(std::unique_ptr<QueuedTask>(
326 new SetTimerTask(std::move(task), milliseconds)));
327 }
328}
329
330void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
331 std::unique_ptr<QueuedTask> reply,
332 TaskQueue* reply_queue) {
333 std::unique_ptr<QueuedTask> wrapper_task(
tommi8c80c6e2017-02-23 00:34:52 -0800334 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
335 reply_queue->wakeup_pipe_in_));
tommic06b1332016-05-14 11:31:40 -0700336 PostTask(std::move(wrapper_task));
337}
338
339void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
340 std::unique_ptr<QueuedTask> reply) {
341 return PostTaskAndReply(std::move(task), std::move(reply), Current());
342}
343
344// static
tommi0f8b4032017-02-22 11:22:05 -0800345void TaskQueue::ThreadMain(void* context) {
tommic06b1332016-05-14 11:31:40 -0700346 TaskQueue* me = static_cast<TaskQueue*>(context);
347
348 QueueContext queue_context(me);
349 pthread_setspecific(GetQueuePtrTls(), &queue_context);
350
351 while (queue_context.is_active)
352 event_base_loop(me->event_base_, 0);
353
354 pthread_setspecific(GetQueuePtrTls(), nullptr);
355
356 for (TimerEvent* timer : queue_context.pending_timers_)
357 delete timer;
tommic06b1332016-05-14 11:31:40 -0700358}
359
360// static
361void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
362 QueueContext* ctx =
363 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
364 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
365 char buf;
366 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
367 switch (buf) {
368 case kQuit:
369 ctx->is_active = false;
370 event_base_loopbreak(ctx->queue->event_base_);
371 break;
372 case kRunTask: {
373 std::unique_ptr<QueuedTask> task;
374 {
375 CritScope lock(&ctx->queue->pending_lock_);
376 RTC_DCHECK(!ctx->queue->pending_.empty());
377 task = std::move(ctx->queue->pending_.front());
378 ctx->queue->pending_.pop_front();
379 RTC_DCHECK(task.get());
380 }
381 if (!task->Run())
382 task.release();
383 break;
384 }
tommi8c80c6e2017-02-23 00:34:52 -0800385 case kRunReplyTask: {
386 scoped_refptr<ReplyTaskOwnerRef> reply_task;
387 {
388 CritScope lock(&ctx->queue->pending_lock_);
389 for (auto it = ctx->queue->pending_replies_.begin();
390 it != ctx->queue->pending_replies_.end(); ++it) {
391 if ((*it)->HasOneRef()) {
392 reply_task = std::move(*it);
393 ctx->queue->pending_replies_.erase(it);
394 break;
395 }
396 }
397 }
398 reply_task->Run();
399 break;
400 }
tommic06b1332016-05-14 11:31:40 -0700401 default:
402 RTC_NOTREACHED();
403 break;
404 }
405}
406
407// static
408void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
409 auto* task = static_cast<QueuedTask*>(context);
410 if (task->Run())
411 delete task;
412}
413
414// static
415void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
416 TimerEvent* timer = static_cast<TimerEvent*>(context);
417 if (!timer->task->Run())
418 timer->task.release();
419 QueueContext* ctx =
420 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
421 ctx->pending_timers_.remove(timer);
422 delete timer;
423}
424
tommi8c80c6e2017-02-23 00:34:52 -0800425void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
tommic06b1332016-05-14 11:31:40 -0700426 RTC_DCHECK(reply_task);
427 CritScope lock(&pending_lock_);
tommi8c80c6e2017-02-23 00:34:52 -0800428 pending_replies_.push_back(std::move(reply_task));
tommic06b1332016-05-14 11:31:40 -0700429}
430
431} // namespace rtc