blob: ab97b0cf840ad99ab35ef4c98438925f94b136a5 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020011#include "rtc_base/task_queue.h"
tommic06b1332016-05-14 11:31:40 -070012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <errno.h>
tommic06b1332016-05-14 11:31:40 -070014#include <fcntl.h>
Yves Gerey988cc082018-10-23 12:03:01 +020015#include <pthread.h>
tommi8c80c6e2017-02-23 00:34:52 -080016#include <signal.h>
Yves Gerey988cc082018-10-23 12:03:01 +020017#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 11:31:40 -070019#include <unistd.h>
Danil Chapovalov02fddf62018-02-12 12:41:16 +010020#include <list>
Yves Gerey988cc082018-10-23 12:03:01 +020021#include <memory>
22#include <type_traits>
23#include <utility>
tommic06b1332016-05-14 11:31:40 -070024
25#include "base/third_party/libevent/event.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020026#include "rtc_base/checks.h"
Steve Anton10542f22019-01-11 09:11:00 -080027#include "rtc_base/critical_section.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020028#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 10:42:26 +010029#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020030#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 12:03:01 +020031#include "rtc_base/platform_thread_types.h"
Steve Anton10542f22019-01-11 09:11:00 -080032#include "rtc_base/ref_count.h"
33#include "rtc_base/ref_counted_object.h"
Yves Gerey988cc082018-10-23 12:03:01 +020034#include "rtc_base/scoped_ref_ptr.h"
Niels Möllera12c42a2018-07-25 16:05:48 +020035#include "rtc_base/system/unused.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020036#include "rtc_base/task_queue_posix.h"
Yves Gerey988cc082018-10-23 12:03:01 +020037#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 09:11:00 -080038#include "rtc_base/time_utils.h"
tommic06b1332016-05-14 11:31:40 -070039
40namespace rtc {
41using internal::GetQueuePtrTls;
42using internal::AutoSetCurrentQueuePtr;
43
44namespace {
45static const char kQuit = 1;
46static const char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080047
tommic9bb7912017-02-24 10:42:14 -080048using Priority = TaskQueue::Priority;
49
tommi8c80c6e2017-02-23 00:34:52 -080050// This ignores the SIGPIPE signal on the calling thread.
51// This signal can be fired when trying to write() to a pipe that's being
52// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 15:46:58 +010053// We can run into that situation so we ignore this signal and continue as
54// normal.
tommi8c80c6e2017-02-23 00:34:52 -080055// As a side note for this implementation, it would be great if we could safely
56// restore the sigmask, but unfortunately the operation of restoring it, can
57// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
58// The SIGPIPE signal by default causes the process to be terminated, so we
59// don't want to risk that.
60// An alternative to this approach is to ignore the signal for the whole
61// process:
62// signal(SIGPIPE, SIG_IGN);
63void IgnoreSigPipeSignalOnCurrentThread() {
64 sigset_t sigpipe_mask;
65 sigemptyset(&sigpipe_mask);
66 sigaddset(&sigpipe_mask, SIGPIPE);
67 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
68}
tommic06b1332016-05-14 11:31:40 -070069
70struct TimerEvent {
71 explicit TimerEvent(std::unique_ptr<QueuedTask> task)
72 : task(std::move(task)) {}
73 ~TimerEvent() { event_del(&ev); }
74 event ev;
75 std::unique_ptr<QueuedTask> task;
76};
77
78bool SetNonBlocking(int fd) {
79 const int flags = fcntl(fd, F_GETFL);
80 RTC_CHECK(flags != -1);
81 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
82}
tommi1666b612016-07-13 10:58:12 -070083
84// TODO(tommi): This is a hack to support two versions of libevent that we're
85// compatible with. The method we really want to call is event_assign(),
86// since event_set() has been marked as deprecated (and doesn't accept
87// passing event_base__ as a parameter). However, the version of libevent
88// that we have in Chromium, doesn't have event_assign(), so we need to call
89// event_set() there.
90void EventAssign(struct event* ev,
91 struct event_base* base,
92 int fd,
93 short events,
94 void (*callback)(int, short, void*),
95 void* arg) {
96#if defined(_EVENT2_EVENT_H_)
97 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
98#else
99 event_set(ev, fd, events, callback, arg);
100 RTC_CHECK_EQ(0, event_base_set(base, ev));
101#endif
102}
tommic9bb7912017-02-24 10:42:14 -0800103
104ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
105 switch (priority) {
106 case Priority::HIGH:
107 return kRealtimePriority;
108 case Priority::LOW:
109 return kLowPriority;
110 case Priority::NORMAL:
111 return kNormalPriority;
112 default:
113 RTC_NOTREACHED();
114 break;
115 }
116 return kNormalPriority;
117}
tommic06b1332016-05-14 11:31:40 -0700118} // namespace
119
perkj650fdae2017-08-25 05:00:11 -0700120class TaskQueue::Impl : public RefCountInterface {
121 public:
122 explicit Impl(const char* queue_name,
123 TaskQueue* queue,
124 Priority priority = Priority::NORMAL);
125 ~Impl() override;
126
127 static TaskQueue::Impl* Current();
128 static TaskQueue* CurrentQueue();
129
130 // Used for DCHECKing the current queue.
perkj650fdae2017-08-25 05:00:11 -0700131 bool IsCurrent() const;
132
133 void PostTask(std::unique_ptr<QueuedTask> task);
perkj650fdae2017-08-25 05:00:11 -0700134 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
135
136 private:
137 static void ThreadMain(void* context);
138 static void OnWakeup(int socket, short flags, void* context); // NOLINT
139 static void RunTask(int fd, short flags, void* context); // NOLINT
140 static void RunTimer(int fd, short flags, void* context); // NOLINT
141
perkj650fdae2017-08-25 05:00:11 -0700142 class SetTimerTask;
143
perkj650fdae2017-08-25 05:00:11 -0700144 struct QueueContext;
145 TaskQueue* const queue_;
146 int wakeup_pipe_in_ = -1;
147 int wakeup_pipe_out_ = -1;
148 event_base* event_base_;
149 std::unique_ptr<event> wakeup_event_;
150 PlatformThread thread_;
151 rtc::CriticalSection pending_lock_;
danilchap3c6abd22017-09-06 05:46:29 -0700152 std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
perkj650fdae2017-08-25 05:00:11 -0700153};
154
155struct TaskQueue::Impl::QueueContext {
156 explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
157 TaskQueue::Impl* queue;
tommic06b1332016-05-14 11:31:40 -0700158 bool is_active;
159 // Holds a list of events pending timers for cleanup when the loop exits.
160 std::list<TimerEvent*> pending_timers_;
161};
162
perkj650fdae2017-08-25 05:00:11 -0700163class TaskQueue::Impl::SetTimerTask : public QueuedTask {
tommic06b1332016-05-14 11:31:40 -0700164 public:
165 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
166 : task_(std::move(task)),
167 milliseconds_(milliseconds),
168 posted_(Time32()) {}
169
170 private:
171 bool Run() override {
172 // Compensate for the time that has passed since construction
173 // and until we got here.
174 uint32_t post_time = Time32() - posted_;
perkj650fdae2017-08-25 05:00:11 -0700175 TaskQueue::Impl::Current()->PostDelayedTask(
tommic06b1332016-05-14 11:31:40 -0700176 std::move(task_),
177 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
178 return true;
179 }
180
181 std::unique_ptr<QueuedTask> task_;
182 const uint32_t milliseconds_;
183 const uint32_t posted_;
184};
185
perkj650fdae2017-08-25 05:00:11 -0700186TaskQueue::Impl::Impl(const char* queue_name,
187 TaskQueue* queue,
188 Priority priority /*= NORMAL*/)
189 : queue_(queue),
190 event_base_(event_base_new()),
tommic06b1332016-05-14 11:31:40 -0700191 wakeup_event_(new event()),
perkj650fdae2017-08-25 05:00:11 -0700192 thread_(&TaskQueue::Impl::ThreadMain,
tommic9bb7912017-02-24 10:42:14 -0800193 this,
194 queue_name,
195 TaskQueuePriorityToThreadPriority(priority)) {
tommic06b1332016-05-14 11:31:40 -0700196 RTC_DCHECK(queue_name);
197 int fds[2];
198 RTC_CHECK(pipe(fds) == 0);
199 SetNonBlocking(fds[0]);
200 SetNonBlocking(fds[1]);
201 wakeup_pipe_out_ = fds[0];
202 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800203
tommi1666b612016-07-13 10:58:12 -0700204 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
205 EV_READ | EV_PERSIST, OnWakeup, this);
tommic06b1332016-05-14 11:31:40 -0700206 event_add(wakeup_event_.get(), 0);
207 thread_.Start();
208}
209
perkj650fdae2017-08-25 05:00:11 -0700210TaskQueue::Impl::~Impl() {
tommic06b1332016-05-14 11:31:40 -0700211 RTC_DCHECK(!IsCurrent());
212 struct timespec ts;
213 char message = kQuit;
214 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
215 // The queue is full, so we have no choice but to wait and retry.
216 RTC_CHECK_EQ(EAGAIN, errno);
217 ts.tv_sec = 0;
218 ts.tv_nsec = 1000000;
219 nanosleep(&ts, nullptr);
220 }
221
222 thread_.Stop();
223
224 event_del(wakeup_event_.get());
tommi8c80c6e2017-02-23 00:34:52 -0800225
226 IgnoreSigPipeSignalOnCurrentThread();
227
tommic06b1332016-05-14 11:31:40 -0700228 close(wakeup_pipe_in_);
229 close(wakeup_pipe_out_);
230 wakeup_pipe_in_ = -1;
231 wakeup_pipe_out_ = -1;
232
tommic06b1332016-05-14 11:31:40 -0700233 event_base_free(event_base_);
234}
235
236// static
perkj650fdae2017-08-25 05:00:11 -0700237TaskQueue::Impl* TaskQueue::Impl::Current() {
tommic06b1332016-05-14 11:31:40 -0700238 QueueContext* ctx =
239 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
240 return ctx ? ctx->queue : nullptr;
241}
242
243// static
perkj650fdae2017-08-25 05:00:11 -0700244TaskQueue* TaskQueue::Impl::CurrentQueue() {
245 TaskQueue::Impl* current = Current();
246 if (current) {
247 return current->queue_;
248 }
249 return nullptr;
250}
251
perkj650fdae2017-08-25 05:00:11 -0700252bool TaskQueue::Impl::IsCurrent() const {
tommic06b1332016-05-14 11:31:40 -0700253 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
254}
255
perkj650fdae2017-08-25 05:00:11 -0700256void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
tommic06b1332016-05-14 11:31:40 -0700257 RTC_DCHECK(task.get());
258 // libevent isn't thread safe. This means that we can't use methods such
259 // as event_base_once to post tasks to the worker thread from a different
260 // thread. However, we can use it when posting from the worker thread itself.
261 if (IsCurrent()) {
perkj650fdae2017-08-25 05:00:11 -0700262 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
tommic06b1332016-05-14 11:31:40 -0700263 task.get(), nullptr) == 0) {
264 task.release();
265 }
266 } else {
267 QueuedTask* task_id = task.get(); // Only used for comparison.
268 {
269 CritScope lock(&pending_lock_);
270 pending_.push_back(std::move(task));
271 }
272 char message = kRunTask;
273 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
Mirko Bonadei675513b2017-11-09 11:09:25 +0100274 RTC_LOG(WARNING) << "Failed to queue task.";
tommic06b1332016-05-14 11:31:40 -0700275 CritScope lock(&pending_lock_);
276 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
277 return t.get() == task_id;
278 });
279 }
280 }
281}
282
perkj650fdae2017-08-25 05:00:11 -0700283void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
284 uint32_t milliseconds) {
tommic06b1332016-05-14 11:31:40 -0700285 if (IsCurrent()) {
286 TimerEvent* timer = new TimerEvent(std::move(task));
perkj650fdae2017-08-25 05:00:11 -0700287 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
288 timer);
tommic06b1332016-05-14 11:31:40 -0700289 QueueContext* ctx =
290 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
291 ctx->pending_timers_.push_back(timer);
kwiberg5b9746e2017-08-16 04:52:35 -0700292 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
293 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
tommic06b1332016-05-14 11:31:40 -0700294 event_add(&timer->ev, &tv);
295 } else {
296 PostTask(std::unique_ptr<QueuedTask>(
297 new SetTimerTask(std::move(task), milliseconds)));
298 }
299}
300
tommic06b1332016-05-14 11:31:40 -0700301// static
perkj650fdae2017-08-25 05:00:11 -0700302void TaskQueue::Impl::ThreadMain(void* context) {
303 TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
tommic06b1332016-05-14 11:31:40 -0700304
305 QueueContext queue_context(me);
306 pthread_setspecific(GetQueuePtrTls(), &queue_context);
307
308 while (queue_context.is_active)
309 event_base_loop(me->event_base_, 0);
310
311 pthread_setspecific(GetQueuePtrTls(), nullptr);
312
313 for (TimerEvent* timer : queue_context.pending_timers_)
314 delete timer;
tommic06b1332016-05-14 11:31:40 -0700315}
316
317// static
perkj650fdae2017-08-25 05:00:11 -0700318void TaskQueue::Impl::OnWakeup(int socket,
319 short flags,
320 void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700321 QueueContext* ctx =
322 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
323 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
324 char buf;
325 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
326 switch (buf) {
327 case kQuit:
328 ctx->is_active = false;
329 event_base_loopbreak(ctx->queue->event_base_);
330 break;
331 case kRunTask: {
332 std::unique_ptr<QueuedTask> task;
333 {
334 CritScope lock(&ctx->queue->pending_lock_);
335 RTC_DCHECK(!ctx->queue->pending_.empty());
336 task = std::move(ctx->queue->pending_.front());
337 ctx->queue->pending_.pop_front();
338 RTC_DCHECK(task.get());
339 }
340 if (!task->Run())
341 task.release();
342 break;
343 }
344 default:
345 RTC_NOTREACHED();
346 break;
347 }
348}
349
350// static
perkj650fdae2017-08-25 05:00:11 -0700351void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700352 auto* task = static_cast<QueuedTask*>(context);
353 if (task->Run())
354 delete task;
355}
356
357// static
perkj650fdae2017-08-25 05:00:11 -0700358void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
tommic06b1332016-05-14 11:31:40 -0700359 TimerEvent* timer = static_cast<TimerEvent*>(context);
360 if (!timer->task->Run())
361 timer->task.release();
362 QueueContext* ctx =
363 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
364 ctx->pending_timers_.remove(timer);
365 delete timer;
366}
367
perkj650fdae2017-08-25 05:00:11 -0700368TaskQueue::TaskQueue(const char* queue_name, Priority priority)
369 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
370}
371
372TaskQueue::~TaskQueue() {}
373
374// static
375TaskQueue* TaskQueue::Current() {
376 return TaskQueue::Impl::CurrentQueue();
377}
378
379// Used for DCHECKing the current queue.
perkj650fdae2017-08-25 05:00:11 -0700380bool TaskQueue::IsCurrent() const {
381 return impl_->IsCurrent();
382}
383
384void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
385 return TaskQueue::impl_->PostTask(std::move(task));
386}
387
perkj650fdae2017-08-25 05:00:11 -0700388void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
389 uint32_t milliseconds) {
390 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
391}
392
tommic06b1332016-05-14 11:31:40 -0700393} // namespace rtc