blob: b1070da7b7973a714186693f8f4e181d56bb74c5 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
kjellandere96c45b2017-06-30 10:45:21 -070011#include "webrtc/rtc_base/task_queue.h"
tommic06b1332016-05-14 11:31:40 -070012
13#include <fcntl.h>
tommi8c80c6e2017-02-23 00:34:52 -080014#include <signal.h>
tommic06b1332016-05-14 11:31:40 -070015#include <string.h>
16#include <unistd.h>
17
18#include "base/third_party/libevent/event.h"
kjellandere96c45b2017-06-30 10:45:21 -070019#include "webrtc/rtc_base/checks.h"
20#include "webrtc/rtc_base/logging.h"
21#include "webrtc/rtc_base/task_queue_posix.h"
22#include "webrtc/rtc_base/timeutils.h"
tommic06b1332016-05-14 11:31:40 -070023
24namespace rtc {
25using internal::GetQueuePtrTls;
26using internal::AutoSetCurrentQueuePtr;
27
28namespace {
29static const char kQuit = 1;
30static const char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080031static const char kRunReplyTask = 3;
32
tommic9bb7912017-02-24 10:42:14 -080033using Priority = TaskQueue::Priority;
34
tommi8c80c6e2017-02-23 00:34:52 -080035// This ignores the SIGPIPE signal on the calling thread.
36// This signal can be fired when trying to write() to a pipe that's being
37// closed or while closing a pipe that's being written to.
38// We can run into that situation (e.g. reply tasks that don't get a chance to
39// run because the task queue is being deleted) so we ignore this signal and
40// continue as normal.
41// As a side note for this implementation, it would be great if we could safely
42// restore the sigmask, but unfortunately the operation of restoring it, can
43// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
44// The SIGPIPE signal by default causes the process to be terminated, so we
45// don't want to risk that.
46// An alternative to this approach is to ignore the signal for the whole
47// process:
48// signal(SIGPIPE, SIG_IGN);
49void IgnoreSigPipeSignalOnCurrentThread() {
50 sigset_t sigpipe_mask;
51 sigemptyset(&sigpipe_mask);
52 sigaddset(&sigpipe_mask, SIGPIPE);
53 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
54}
tommic06b1332016-05-14 11:31:40 -070055
56struct TimerEvent {
57 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
58 : task(std::move(task)) {}
59 ~TimerEvent() { event_del(&ev); }
60 event ev;
61 std::unique_ptr<QueuedTask> task;
62};
63
64bool SetNonBlocking(int fd) {
65 const int flags = fcntl(fd, F_GETFL);
66 RTC_CHECK(flags != -1);
67 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
68}
tommi1666b612016-07-13 10:58:12 -070069
70// TODO(tommi): This is a hack to support two versions of libevent that we're
71// compatible with. The method we really want to call is event_assign(),
72// since event_set() has been marked as deprecated (and doesn't accept
73// passing event_base__ as a parameter). However, the version of libevent
74// that we have in Chromium, doesn't have event_assign(), so we need to call
75// event_set() there.
76void EventAssign(struct event* ev,
77 struct event_base* base,
78 int fd,
79 short events,
80 void (*callback)(int, short, void*),
81 void* arg) {
82#if defined(_EVENT2_EVENT_H_)
83 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
84#else
85 event_set(ev, fd, events, callback, arg);
86 RTC_CHECK_EQ(0, event_base_set(base, ev));
87#endif
88}
tommic9bb7912017-02-24 10:42:14 -080089
90ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
91 switch (priority) {
92 case Priority::HIGH:
93 return kRealtimePriority;
94 case Priority::LOW:
95 return kLowPriority;
96 case Priority::NORMAL:
97 return kNormalPriority;
98 default:
99 RTC_NOTREACHED();
100 break;
101 }
102 return kNormalPriority;
103}
tommic06b1332016-05-14 11:31:40 -0700104} // namespace
105
106struct TaskQueue::QueueContext {
107 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
108 TaskQueue* queue;
109 bool is_active;
110 // Holds a list of events pending timers for cleanup when the loop exits.
111 std::list<TimerEvent*> pending_timers_;
112};
113
tommi8c80c6e2017-02-23 00:34:52 -0800114// Posting a reply task is tricky business. This class owns the reply task
115// and a reference to it is held by both the reply queue and the first task.
116// Here's an outline of what happens when dealing with a reply task.
117// * The ReplyTaskOwner owns the |reply_| task.
118// * One ref owned by PostAndReplyTask
119// * One ref owned by the reply TaskQueue
120// * ReplyTaskOwner has a flag |run_task_| initially set to false.
121// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
122// * After successfully running the original |task_|, PostAndReplyTask() calls
123// set_should_run_task(). This sets |run_task_| to true.
124// * In PostAndReplyTask's dtor:
125// * It releases its reference to ReplyTaskOwner (important to do this first).
126// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
127// * PostAndReplyTask doesn't care if write() fails, but when it does:
128// * The reply queue is gone.
129// * ReplyTaskOwner has already been deleted and the reply task too.
130// * If write() succeeds:
131// * ReplyQueue receives the kRunReplyTask message
132// * Goes through all pending tasks, finding the first that HasOneRef()
133// * Calls ReplyTaskOwner::Run()
134// * if set_should_run_task() was called, the reply task will be run
135// * Release the reference to ReplyTaskOwner
136// * ReplyTaskOwner and associated |reply_| are deleted.
137class TaskQueue::ReplyTaskOwner {
138 public:
139 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
140 : reply_(std::move(reply)) {}
141
142 void Run() {
143 RTC_DCHECK(reply_);
144 if (run_task_) {
145 if (!reply_->Run())
146 reply_.release();
147 }
148 reply_.reset();
149 }
150
151 void set_should_run_task() {
152 RTC_DCHECK(!run_task_);
153 run_task_ = true;
154 }
155
156 private:
157 std::unique_ptr<QueuedTask> reply_;
158 bool run_task_ = false;
159};
160
tommic06b1332016-05-14 11:31:40 -0700161class TaskQueue::PostAndReplyTask : public QueuedTask {
162 public:
163 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
164 std::unique_ptr<QueuedTask> reply,
tommi8c80c6e2017-02-23 00:34:52 -0800165 TaskQueue* reply_queue,
166 int reply_pipe)
tommic06b1332016-05-14 11:31:40 -0700167 : task_(std::move(task)),
tommi8c80c6e2017-02-23 00:34:52 -0800168 reply_pipe_(reply_pipe),
169 reply_task_owner_(
170 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
171 reply_queue->PrepareReplyTask(reply_task_owner_);
tommic06b1332016-05-14 11:31:40 -0700172 }
173
174 ~PostAndReplyTask() override {
tommi8c80c6e2017-02-23 00:34:52 -0800175 reply_task_owner_ = nullptr;
176 IgnoreSigPipeSignalOnCurrentThread();
177 // Send a signal to the reply queue that the reply task can run now.
178 // Depending on whether |set_should_run_task()| was called by the
179 // PostAndReplyTask(), the reply task may or may not actually run.
180 // In either case, it will be deleted.
181 char message = kRunReplyTask;
182 write(reply_pipe_, &message, sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700183 }
184
185 private:
186 bool Run() override {
187 if (!task_->Run())
188 task_.release();
tommi8c80c6e2017-02-23 00:34:52 -0800189 reply_task_owner_->set_should_run_task();
tommic06b1332016-05-14 11:31:40 -0700190 return true;
191 }
192
tommic06b1332016-05-14 11:31:40 -0700193 std::unique_ptr<QueuedTask> task_;
tommi8c80c6e2017-02-23 00:34:52 -0800194 int reply_pipe_;
195 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
tommic06b1332016-05-14 11:31:40 -0700196};
197
198class TaskQueue::SetTimerTask : public QueuedTask {
199 public:
200 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
201 : task_(std::move(task)),
202 milliseconds_(milliseconds),
203 posted_(Time32()) {}
204
205 private:
206 bool Run() override {
207 // Compensate for the time that has passed since construction
208 // and until we got here.
209 uint32_t post_time = Time32() - posted_;
210 TaskQueue::Current()->PostDelayedTask(
211 std::move(task_),
212 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
213 return true;
214 }
215
216 std::unique_ptr<QueuedTask> task_;
217 const uint32_t milliseconds_;
218 const uint32_t posted_;
219};
220
tommic9bb7912017-02-24 10:42:14 -0800221TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
tommic06b1332016-05-14 11:31:40 -0700222 : event_base_(event_base_new()),
223 wakeup_event_(new event()),
tommic9bb7912017-02-24 10:42:14 -0800224 thread_(&TaskQueue::ThreadMain,
225 this,
226 queue_name,
227 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700228 RTC_DCHECK(queue_name);
229 int fds[2];
230 RTC_CHECK(pipe(fds) == 0);
231 SetNonBlocking(fds[0]);
232 SetNonBlocking(fds[1]);
233 wakeup_pipe_out_ = fds[0];
234 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800235
tommi1666b612016-07-13 10:58:12 -0700236 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
237 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700238 event_add(wakeup_event_.get(), 0);
239 thread_.Start();
240}
241
242TaskQueue::~TaskQueue() {
243 RTC_DCHECK(!IsCurrent());
244 struct timespec ts;
245 char message = kQuit;
246 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
247 // The queue is full, so we have no choice but to wait and retry.
248 RTC_CHECK_EQ(EAGAIN, errno);
249 ts.tv_sec = 0;
250 ts.tv_nsec = 1000000;
251 nanosleep(&ts, nullptr);
252 }
253
254 thread_.Stop();
255
256 event_del(wakeup_event_.get());
tommi8c80c6e2017-02-23 00:34:52 -0800257
258 IgnoreSigPipeSignalOnCurrentThread();
259
tommic06b1332016-05-14 11:31:40 -0700260 close(wakeup_pipe_in_);
261 close(wakeup_pipe_out_);
262 wakeup_pipe_in_ = -1;
263 wakeup_pipe_out_ = -1;
264
tommic06b1332016-05-14 11:31:40 -0700265 event_base_free(event_base_);
266}
267
268// static
269TaskQueue* TaskQueue::Current() {
270 QueueContext* ctx =
271 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
272 return ctx ? ctx->queue : nullptr;
273}
274
275// static
276bool TaskQueue::IsCurrent(const char* queue_name) {
277 TaskQueue* current = Current();
278 return current && current->thread_.name().compare(queue_name) == 0;
279}
280
281bool TaskQueue::IsCurrent() const {
282 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
283}
284
285void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
286 RTC_DCHECK(task.get());
287 // libevent isn't thread safe. This means that we can't use methods such
288 // as event_base_once to post tasks to the worker thread from a different
289 // thread. However, we can use it when posting from the worker thread itself.
290 if (IsCurrent()) {
291 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
292 task.get(), nullptr) == 0) {
293 task.release();
294 }
295 } else {
296 QueuedTask* task_id = task.get(); // Only used for comparison.
297 {
298 CritScope lock(&pending_lock_);
299 pending_.push_back(std::move(task));
300 }
301 char message = kRunTask;
302 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
303 LOG(WARNING) << "Failed to queue task.";
304 CritScope lock(&pending_lock_);
305 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
306 return t.get() == task_id;
307 });
308 }
309 }
310}
311
312void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
313 uint32_t milliseconds) {
314 if (IsCurrent()) {
315 TimerEvent* timer = new TimerEvent(std::move(task));
tommi1666b612016-07-13 10:58:12 -0700316 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
tommic06b1332016-05-14 11:31:40 -0700317 QueueContext* ctx =
318 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
319 ctx->pending_timers_.push_back(timer);
320 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
321 event_add(&timer->ev, &tv);
322 } else {
323 PostTask(std::unique_ptr<QueuedTask>(
324 new SetTimerTask(std::move(task), milliseconds)));
325 }
326}
327
328void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
329 std::unique_ptr<QueuedTask> reply,
330 TaskQueue* reply_queue) {
331 std::unique_ptr<QueuedTask> wrapper_task(
tommi8c80c6e2017-02-23 00:34:52 -0800332 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
333 reply_queue->wakeup_pipe_in_));
tommic06b1332016-05-14 11:31:40 -0700334 PostTask(std::move(wrapper_task));
335}
336
337void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
338 std::unique_ptr<QueuedTask> reply) {
339 return PostTaskAndReply(std::move(task), std::move(reply), Current());
340}
341
342// static
tommi0f8b4032017-02-22 11:22:05 -0800343void TaskQueue::ThreadMain(void* context) {
tommic06b1332016-05-14 11:31:40 -0700344 TaskQueue* me = static_cast<TaskQueue*>(context);
345
346 QueueContext queue_context(me);
347 pthread_setspecific(GetQueuePtrTls(), &queue_context);
348
349 while (queue_context.is_active)
350 event_base_loop(me->event_base_, 0);
351
352 pthread_setspecific(GetQueuePtrTls(), nullptr);
353
354 for (TimerEvent* timer : queue_context.pending_timers_)
355 delete timer;
tommic06b1332016-05-14 11:31:40 -0700356}
357
358// static
359void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
360 QueueContext* ctx =
361 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
362 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
363 char buf;
364 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
365 switch (buf) {
366 case kQuit:
367 ctx->is_active = false;
368 event_base_loopbreak(ctx->queue->event_base_);
369 break;
370 case kRunTask: {
371 std::unique_ptr<QueuedTask> task;
372 {
373 CritScope lock(&ctx->queue->pending_lock_);
374 RTC_DCHECK(!ctx->queue->pending_.empty());
375 task = std::move(ctx->queue->pending_.front());
376 ctx->queue->pending_.pop_front();
377 RTC_DCHECK(task.get());
378 }
379 if (!task->Run())
380 task.release();
381 break;
382 }
tommi8c80c6e2017-02-23 00:34:52 -0800383 case kRunReplyTask: {
384 scoped_refptr<ReplyTaskOwnerRef> reply_task;
385 {
386 CritScope lock(&ctx->queue->pending_lock_);
387 for (auto it = ctx->queue->pending_replies_.begin();
388 it != ctx->queue->pending_replies_.end(); ++it) {
389 if ((*it)->HasOneRef()) {
390 reply_task = std::move(*it);
391 ctx->queue->pending_replies_.erase(it);
392 break;
393 }
394 }
395 }
396 reply_task->Run();
397 break;
398 }
tommic06b1332016-05-14 11:31:40 -0700399 default:
400 RTC_NOTREACHED();
401 break;
402 }
403}
404
405// static
406void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
407 auto* task = static_cast<QueuedTask*>(context);
408 if (task->Run())
409 delete task;
410}
411
412// static
413void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
414 TimerEvent* timer = static_cast<TimerEvent*>(context);
415 if (!timer->task->Run())
416 timer->task.release();
417 QueueContext* ctx =
418 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
419 ctx->pending_timers_.remove(timer);
420 delete timer;
421}
422
tommi8c80c6e2017-02-23 00:34:52 -0800423void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
tommic06b1332016-05-14 11:31:40 -0700424 RTC_DCHECK(reply_task);
425 CritScope lock(&pending_lock_);
tommi8c80c6e2017-02-23 00:34:52 -0800426 pending_replies_.push_back(std::move(reply_task));
tommic06b1332016-05-14 11:31:40 -0700427}
428
429} // namespace rtc