blob: c6ce5b3baf3e301550308d407a4866dfd05f8abb [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/base/task_queue.h"
12
13#include <fcntl.h>
tommi8c80c6e2017-02-23 00:34:52 -080014#include <signal.h>
tommic06b1332016-05-14 11:31:40 -070015#include <string.h>
16#include <unistd.h>
17
18#include "base/third_party/libevent/event.h"
19#include "webrtc/base/checks.h"
20#include "webrtc/base/logging.h"
21#include "webrtc/base/task_queue_posix.h"
22#include "webrtc/base/timeutils.h"
23
24namespace rtc {
25using internal::GetQueuePtrTls;
26using internal::AutoSetCurrentQueuePtr;
27
28namespace {
29static const char kQuit = 1;
30static const char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080031static const char kRunReplyTask = 3;
32
33// This ignores the SIGPIPE signal on the calling thread.
34// This signal can be fired when trying to write() to a pipe that's being
35// closed or while closing a pipe that's being written to.
36// We can run into that situation (e.g. reply tasks that don't get a chance to
37// run because the task queue is being deleted) so we ignore this signal and
38// continue as normal.
39// As a side note for this implementation, it would be great if we could safely
40// restore the sigmask, but unfortunately the operation of restoring it, can
41// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
42// The SIGPIPE signal by default causes the process to be terminated, so we
43// don't want to risk that.
44// An alternative to this approach is to ignore the signal for the whole
45// process:
46// signal(SIGPIPE, SIG_IGN);
47void IgnoreSigPipeSignalOnCurrentThread() {
48 sigset_t sigpipe_mask;
49 sigemptyset(&sigpipe_mask);
50 sigaddset(&sigpipe_mask, SIGPIPE);
51 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
52}
tommic06b1332016-05-14 11:31:40 -070053
54struct TimerEvent {
55 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
56 : task(std::move(task)) {}
57 ~TimerEvent() { event_del(&ev); }
58 event ev;
59 std::unique_ptr<QueuedTask> task;
60};
61
62bool SetNonBlocking(int fd) {
63 const int flags = fcntl(fd, F_GETFL);
64 RTC_CHECK(flags != -1);
65 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
66}
tommi1666b612016-07-13 10:58:12 -070067
68// TODO(tommi): This is a hack to support two versions of libevent that we're
69// compatible with. The method we really want to call is event_assign(),
70// since event_set() has been marked as deprecated (and doesn't accept
71// passing event_base__ as a parameter). However, the version of libevent
72// that we have in Chromium, doesn't have event_assign(), so we need to call
73// event_set() there.
74void EventAssign(struct event* ev,
75 struct event_base* base,
76 int fd,
77 short events,
78 void (*callback)(int, short, void*),
79 void* arg) {
80#if defined(_EVENT2_EVENT_H_)
81 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
82#else
83 event_set(ev, fd, events, callback, arg);
84 RTC_CHECK_EQ(0, event_base_set(base, ev));
85#endif
86}
tommic06b1332016-05-14 11:31:40 -070087} // namespace
88
89struct TaskQueue::QueueContext {
90 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
91 TaskQueue* queue;
92 bool is_active;
93 // Holds a list of events pending timers for cleanup when the loop exits.
94 std::list<TimerEvent*> pending_timers_;
95};
96
tommi8c80c6e2017-02-23 00:34:52 -080097// Posting a reply task is tricky business. This class owns the reply task
98// and a reference to it is held by both the reply queue and the first task.
99// Here's an outline of what happens when dealing with a reply task.
100// * The ReplyTaskOwner owns the |reply_| task.
101// * One ref owned by PostAndReplyTask
102// * One ref owned by the reply TaskQueue
103// * ReplyTaskOwner has a flag |run_task_| initially set to false.
104// * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
105// * After successfully running the original |task_|, PostAndReplyTask() calls
106// set_should_run_task(). This sets |run_task_| to true.
107// * In PostAndReplyTask's dtor:
108// * It releases its reference to ReplyTaskOwner (important to do this first).
109// * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
110// * PostAndReplyTask doesn't care if write() fails, but when it does:
111// * The reply queue is gone.
112// * ReplyTaskOwner has already been deleted and the reply task too.
113// * If write() succeeds:
114// * ReplyQueue receives the kRunReplyTask message
115// * Goes through all pending tasks, finding the first that HasOneRef()
116// * Calls ReplyTaskOwner::Run()
117// * if set_should_run_task() was called, the reply task will be run
118// * Release the reference to ReplyTaskOwner
119// * ReplyTaskOwner and associated |reply_| are deleted.
120class TaskQueue::ReplyTaskOwner {
121 public:
122 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
123 : reply_(std::move(reply)) {}
124
125 void Run() {
126 RTC_DCHECK(reply_);
127 if (run_task_) {
128 if (!reply_->Run())
129 reply_.release();
130 }
131 reply_.reset();
132 }
133
134 void set_should_run_task() {
135 RTC_DCHECK(!run_task_);
136 run_task_ = true;
137 }
138
139 private:
140 std::unique_ptr<QueuedTask> reply_;
141 bool run_task_ = false;
142};
143
tommic06b1332016-05-14 11:31:40 -0700144class TaskQueue::PostAndReplyTask : public QueuedTask {
145 public:
146 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
147 std::unique_ptr<QueuedTask> reply,
tommi8c80c6e2017-02-23 00:34:52 -0800148 TaskQueue* reply_queue,
149 int reply_pipe)
tommic06b1332016-05-14 11:31:40 -0700150 : task_(std::move(task)),
tommi8c80c6e2017-02-23 00:34:52 -0800151 reply_pipe_(reply_pipe),
152 reply_task_owner_(
153 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
154 reply_queue->PrepareReplyTask(reply_task_owner_);
tommic06b1332016-05-14 11:31:40 -0700155 }
156
157 ~PostAndReplyTask() override {
tommi8c80c6e2017-02-23 00:34:52 -0800158 reply_task_owner_ = nullptr;
159 IgnoreSigPipeSignalOnCurrentThread();
160 // Send a signal to the reply queue that the reply task can run now.
161 // Depending on whether |set_should_run_task()| was called by the
162 // PostAndReplyTask(), the reply task may or may not actually run.
163 // In either case, it will be deleted.
164 char message = kRunReplyTask;
165 write(reply_pipe_, &message, sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700166 }
167
168 private:
169 bool Run() override {
170 if (!task_->Run())
171 task_.release();
tommi8c80c6e2017-02-23 00:34:52 -0800172 reply_task_owner_->set_should_run_task();
tommic06b1332016-05-14 11:31:40 -0700173 return true;
174 }
175
tommic06b1332016-05-14 11:31:40 -0700176 std::unique_ptr<QueuedTask> task_;
tommi8c80c6e2017-02-23 00:34:52 -0800177 int reply_pipe_;
178 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
tommic06b1332016-05-14 11:31:40 -0700179};
180
181class TaskQueue::SetTimerTask : public QueuedTask {
182 public:
183 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
184 : task_(std::move(task)),
185 milliseconds_(milliseconds),
186 posted_(Time32()) {}
187
188 private:
189 bool Run() override {
190 // Compensate for the time that has passed since construction
191 // and until we got here.
192 uint32_t post_time = Time32() - posted_;
193 TaskQueue::Current()->PostDelayedTask(
194 std::move(task_),
195 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
196 return true;
197 }
198
199 std::unique_ptr<QueuedTask> task_;
200 const uint32_t milliseconds_;
201 const uint32_t posted_;
202};
203
204TaskQueue::TaskQueue(const char* queue_name)
205 : event_base_(event_base_new()),
206 wakeup_event_(new event()),
207 thread_(&TaskQueue::ThreadMain, this, queue_name) {
208 RTC_DCHECK(queue_name);
209 int fds[2];
210 RTC_CHECK(pipe(fds) == 0);
211 SetNonBlocking(fds[0]);
212 SetNonBlocking(fds[1]);
213 wakeup_pipe_out_ = fds[0];
214 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800215
tommi1666b612016-07-13 10:58:12 -0700216 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
217 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700218 event_add(wakeup_event_.get(), 0);
219 thread_.Start();
220}
221
222TaskQueue::~TaskQueue() {
223 RTC_DCHECK(!IsCurrent());
224 struct timespec ts;
225 char message = kQuit;
226 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
227 // The queue is full, so we have no choice but to wait and retry.
228 RTC_CHECK_EQ(EAGAIN, errno);
229 ts.tv_sec = 0;
230 ts.tv_nsec = 1000000;
231 nanosleep(&ts, nullptr);
232 }
233
234 thread_.Stop();
235
236 event_del(wakeup_event_.get());
tommi8c80c6e2017-02-23 00:34:52 -0800237
238 IgnoreSigPipeSignalOnCurrentThread();
239
tommic06b1332016-05-14 11:31:40 -0700240 close(wakeup_pipe_in_);
241 close(wakeup_pipe_out_);
242 wakeup_pipe_in_ = -1;
243 wakeup_pipe_out_ = -1;
244
tommic06b1332016-05-14 11:31:40 -0700245 event_base_free(event_base_);
246}
247
248// static
249TaskQueue* TaskQueue::Current() {
250 QueueContext* ctx =
251 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
252 return ctx ? ctx->queue : nullptr;
253}
254
255// static
256bool TaskQueue::IsCurrent(const char* queue_name) {
257 TaskQueue* current = Current();
258 return current && current->thread_.name().compare(queue_name) == 0;
259}
260
261bool TaskQueue::IsCurrent() const {
262 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
263}
264
265void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
266 RTC_DCHECK(task.get());
267 // libevent isn't thread safe. This means that we can't use methods such
268 // as event_base_once to post tasks to the worker thread from a different
269 // thread. However, we can use it when posting from the worker thread itself.
270 if (IsCurrent()) {
271 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
272 task.get(), nullptr) == 0) {
273 task.release();
274 }
275 } else {
276 QueuedTask* task_id = task.get(); // Only used for comparison.
277 {
278 CritScope lock(&pending_lock_);
279 pending_.push_back(std::move(task));
280 }
281 char message = kRunTask;
282 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
283 LOG(WARNING) << "Failed to queue task.";
284 CritScope lock(&pending_lock_);
285 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
286 return t.get() == task_id;
287 });
288 }
289 }
290}
291
292void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
293 uint32_t milliseconds) {
294 if (IsCurrent()) {
295 TimerEvent* timer = new TimerEvent(std::move(task));
tommi1666b612016-07-13 10:58:12 -0700296 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
tommic06b1332016-05-14 11:31:40 -0700297 QueueContext* ctx =
298 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
299 ctx->pending_timers_.push_back(timer);
300 timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
301 event_add(&timer->ev, &tv);
302 } else {
303 PostTask(std::unique_ptr<QueuedTask>(
304 new SetTimerTask(std::move(task), milliseconds)));
305 }
306}
307
308void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
309 std::unique_ptr<QueuedTask> reply,
310 TaskQueue* reply_queue) {
311 std::unique_ptr<QueuedTask> wrapper_task(
tommi8c80c6e2017-02-23 00:34:52 -0800312 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
313 reply_queue->wakeup_pipe_in_));
tommic06b1332016-05-14 11:31:40 -0700314 PostTask(std::move(wrapper_task));
315}
316
317void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
318 std::unique_ptr<QueuedTask> reply) {
319 return PostTaskAndReply(std::move(task), std::move(reply), Current());
320}
321
322// static
tommi0f8b4032017-02-22 11:22:05 -0800323void TaskQueue::ThreadMain(void* context) {
tommic06b1332016-05-14 11:31:40 -0700324 TaskQueue* me = static_cast<TaskQueue*>(context);
325
326 QueueContext queue_context(me);
327 pthread_setspecific(GetQueuePtrTls(), &queue_context);
328
329 while (queue_context.is_active)
330 event_base_loop(me->event_base_, 0);
331
332 pthread_setspecific(GetQueuePtrTls(), nullptr);
333
334 for (TimerEvent* timer : queue_context.pending_timers_)
335 delete timer;
tommic06b1332016-05-14 11:31:40 -0700336}
337
338// static
339void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
340 QueueContext* ctx =
341 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
342 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
343 char buf;
344 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
345 switch (buf) {
346 case kQuit:
347 ctx->is_active = false;
348 event_base_loopbreak(ctx->queue->event_base_);
349 break;
350 case kRunTask: {
351 std::unique_ptr<QueuedTask> task;
352 {
353 CritScope lock(&ctx->queue->pending_lock_);
354 RTC_DCHECK(!ctx->queue->pending_.empty());
355 task = std::move(ctx->queue->pending_.front());
356 ctx->queue->pending_.pop_front();
357 RTC_DCHECK(task.get());
358 }
359 if (!task->Run())
360 task.release();
361 break;
362 }
tommi8c80c6e2017-02-23 00:34:52 -0800363 case kRunReplyTask: {
364 scoped_refptr<ReplyTaskOwnerRef> reply_task;
365 {
366 CritScope lock(&ctx->queue->pending_lock_);
367 for (auto it = ctx->queue->pending_replies_.begin();
368 it != ctx->queue->pending_replies_.end(); ++it) {
369 if ((*it)->HasOneRef()) {
370 reply_task = std::move(*it);
371 ctx->queue->pending_replies_.erase(it);
372 break;
373 }
374 }
375 }
376 reply_task->Run();
377 break;
378 }
tommic06b1332016-05-14 11:31:40 -0700379 default:
380 RTC_NOTREACHED();
381 break;
382 }
383}
384
385// static
386void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
387 auto* task = static_cast<QueuedTask*>(context);
388 if (task->Run())
389 delete task;
390}
391
392// static
393void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
394 TimerEvent* timer = static_cast<TimerEvent*>(context);
395 if (!timer->task->Run())
396 timer->task.release();
397 QueueContext* ctx =
398 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
399 ctx->pending_timers_.remove(timer);
400 delete timer;
401}
402
tommi8c80c6e2017-02-23 00:34:52 -0800403void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
tommic06b1332016-05-14 11:31:40 -0700404 RTC_DCHECK(reply_task);
405 CritScope lock(&pending_lock_);
tommi8c80c6e2017-02-23 00:34:52 -0800406 pending_replies_.push_back(std::move(reply_task));
tommic06b1332016-05-14 11:31:40 -0700407}
408
409} // namespace rtc