blob: 63633b5198ab227f8c9b4e2eacbb10552adda942 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Danil Chapovaloveb175242019-02-12 10:44:38 +010011#include "rtc_base/task_queue_libevent.h"
tommic06b1332016-05-14 11:31:40 -070012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <errno.h>
tommic06b1332016-05-14 11:31:40 -070014#include <fcntl.h>
Yves Gerey988cc082018-10-23 12:03:01 +020015#include <pthread.h>
tommi8c80c6e2017-02-23 00:34:52 -080016#include <signal.h>
Yves Gerey988cc082018-10-23 12:03:01 +020017#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 11:31:40 -070019#include <unistd.h>
Jonas Olssona4d87372019-07-05 19:08:33 +020020
Danil Chapovalov02fddf62018-02-12 12:41:16 +010021#include <list>
Yves Gerey988cc082018-10-23 12:03:01 +020022#include <memory>
23#include <type_traits>
24#include <utility>
tommic06b1332016-05-14 11:31:40 -070025
Steve Anton9a83dd72020-01-09 11:03:25 -080026#include "absl/container/inlined_vector.h"
Danil Chapovalov30c2a312022-07-19 14:12:43 +020027#include "absl/functional/any_invocable.h"
Danil Chapovaloveb175242019-02-12 10:44:38 +010028#include "absl/strings/string_view.h"
Danil Chapovaloveb175242019-02-12 10:44:38 +010029#include "api/task_queue/task_queue_base.h"
Danil Chapovalov30c2a312022-07-19 14:12:43 +020030#include "api/units/time_delta.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020031#include "rtc_base/checks.h"
32#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 10:42:26 +010033#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020034#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 12:03:01 +020035#include "rtc_base/platform_thread_types.h"
Markus Handell18523c32020-07-08 17:55:58 +020036#include "rtc_base/synchronization/mutex.h"
Yves Gerey988cc082018-10-23 12:03:01 +020037#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 09:11:00 -080038#include "rtc_base/time_utils.h"
Byoungchan Leed69a7262022-06-23 22:06:00 +090039#include "third_party/libevent/event.h"
tommic06b1332016-05-14 11:31:40 -070040
Danil Chapovaloveb175242019-02-12 10:44:38 +010041namespace webrtc {
tommic06b1332016-05-14 11:31:40 -070042namespace {
Danil Chapovaloveb175242019-02-12 10:44:38 +010043constexpr char kQuit = 1;
Steve Anton9a83dd72020-01-09 11:03:25 -080044constexpr char kRunTasks = 2;
tommi8c80c6e2017-02-23 00:34:52 -080045
Danil Chapovaloveb175242019-02-12 10:44:38 +010046using Priority = TaskQueueFactory::Priority;
tommic9bb7912017-02-24 10:42:14 -080047
tommi8c80c6e2017-02-23 00:34:52 -080048// This ignores the SIGPIPE signal on the calling thread.
49// This signal can be fired when trying to write() to a pipe that's being
50// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 15:46:58 +010051// We can run into that situation so we ignore this signal and continue as
52// normal.
tommi8c80c6e2017-02-23 00:34:52 -080053// As a side note for this implementation, it would be great if we could safely
54// restore the sigmask, but unfortunately the operation of restoring it, can
55// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56// The SIGPIPE signal by default causes the process to be terminated, so we
57// don't want to risk that.
58// An alternative to this approach is to ignore the signal for the whole
59// process:
60// signal(SIGPIPE, SIG_IGN);
61void IgnoreSigPipeSignalOnCurrentThread() {
62 sigset_t sigpipe_mask;
63 sigemptyset(&sigpipe_mask);
64 sigaddset(&sigpipe_mask, SIGPIPE);
65 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66}
tommic06b1332016-05-14 11:31:40 -070067
tommic06b1332016-05-14 11:31:40 -070068bool SetNonBlocking(int fd) {
69 const int flags = fcntl(fd, F_GETFL);
70 RTC_CHECK(flags != -1);
71 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72}
tommi1666b612016-07-13 10:58:12 -070073
74// TODO(tommi): This is a hack to support two versions of libevent that we're
75// compatible with. The method we really want to call is event_assign(),
76// since event_set() has been marked as deprecated (and doesn't accept
77// passing event_base__ as a parameter). However, the version of libevent
78// that we have in Chromium, doesn't have event_assign(), so we need to call
79// event_set() there.
80void EventAssign(struct event* ev,
81 struct event_base* base,
82 int fd,
83 short events,
84 void (*callback)(int, short, void*),
85 void* arg) {
86#if defined(_EVENT2_EVENT_H_)
87 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88#else
89 event_set(ev, fd, events, callback, arg);
90 RTC_CHECK_EQ(0, event_base_set(base, ev));
91#endif
92}
tommic9bb7912017-02-24 10:42:14 -080093
Danil Chapovaloveb175242019-02-12 10:44:38 +010094rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
tommic9bb7912017-02-24 10:42:14 -080095 switch (priority) {
96 case Priority::HIGH:
Markus Handellad5037b2021-05-07 15:02:36 +020097 return rtc::ThreadPriority::kRealtime;
tommic9bb7912017-02-24 10:42:14 -080098 case Priority::LOW:
Markus Handellad5037b2021-05-07 15:02:36 +020099 return rtc::ThreadPriority::kLow;
tommic9bb7912017-02-24 10:42:14 -0800100 case Priority::NORMAL:
Markus Handellad5037b2021-05-07 15:02:36 +0200101 return rtc::ThreadPriority::kNormal;
tommic9bb7912017-02-24 10:42:14 -0800102 }
tommic9bb7912017-02-24 10:42:14 -0800103}
tommic06b1332016-05-14 11:31:40 -0700104
Danil Chapovaloveb175242019-02-12 10:44:38 +0100105class TaskQueueLibevent final : public TaskQueueBase {
perkj650fdae2017-08-25 05:00:11 -0700106 public:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100107 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
perkj650fdae2017-08-25 05:00:11 -0700108
Danil Chapovaloveb175242019-02-12 10:44:38 +0100109 void Delete() override;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200110 void PostTask(absl::AnyInvocable<void() &&> task) override;
111 void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112 TimeDelta delay) override;
113 void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114 TimeDelta delay) override;
perkj650fdae2017-08-25 05:00:11 -0700115
116 private:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100117 struct TimerEvent;
118
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200119 void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120 TimeDelta delay);
121
Danil Chapovaloveb175242019-02-12 10:44:38 +0100122 ~TaskQueueLibevent() override = default;
123
perkj650fdae2017-08-25 05:00:11 -0700124 static void OnWakeup(int socket, short flags, void* context); // NOLINT
perkj650fdae2017-08-25 05:00:11 -0700125 static void RunTimer(int fd, short flags, void* context); // NOLINT
126
Danil Chapovaloveb175242019-02-12 10:44:38 +0100127 bool is_active_ = true;
perkj650fdae2017-08-25 05:00:11 -0700128 int wakeup_pipe_in_ = -1;
129 int wakeup_pipe_out_ = -1;
130 event_base* event_base_;
Danil Chapovaloveb175242019-02-12 10:44:38 +0100131 event wakeup_event_;
132 rtc::PlatformThread thread_;
Markus Handell18523c32020-07-08 17:55:58 +0200133 Mutex pending_lock_;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200134 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
Steve Anton9a83dd72020-01-09 11:03:25 -0800135 RTC_GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 11:31:40 -0700136 // Holds a list of events pending timers for cleanup when the loop exits.
137 std::list<TimerEvent*> pending_timers_;
138};
139
Danil Chapovaloveb175242019-02-12 10:44:38 +0100140struct TaskQueueLibevent::TimerEvent {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200141 TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
Danil Chapovaloveb175242019-02-12 10:44:38 +0100142 : task_queue(task_queue), task(std::move(task)) {}
143 ~TimerEvent() { event_del(&ev); }
144
145 event ev;
146 TaskQueueLibevent* task_queue;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200147 absl::AnyInvocable<void() &&> task;
tommic06b1332016-05-14 11:31:40 -0700148};
149
Danil Chapovaloveb175242019-02-12 10:44:38 +0100150TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151 rtc::ThreadPriority priority)
Markus Handellad5037b2021-05-07 15:02:36 +0200152 : event_base_(event_base_new()) {
tommic06b1332016-05-14 11:31:40 -0700153 int fds[2];
154 RTC_CHECK(pipe(fds) == 0);
155 SetNonBlocking(fds[0]);
156 SetNonBlocking(fds[1]);
157 wakeup_pipe_out_ = fds[0];
158 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800159
Danil Chapovaloveb175242019-02-12 10:44:38 +0100160 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
tommi1666b612016-07-13 10:58:12 -0700161 EV_READ | EV_PERSIST, OnWakeup, this);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100162 event_add(&wakeup_event_, 0);
Markus Handellad5037b2021-05-07 15:02:36 +0200163 thread_ = rtc::PlatformThread::SpawnJoinable(
164 [this] {
165 {
166 CurrentTaskQueueSetter set_current(this);
167 while (is_active_)
168 event_base_loop(event_base_, 0);
Markus Handellad5037b2021-05-07 15:02:36 +0200169
Markus Handell82da9322022-12-16 15:50:24 +0100170 // Ensure remaining deleted tasks are destroyed with Current() set up
171 // to this task queue.
172 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending;
173 MutexLock lock(&pending_lock_);
174 pending_.swap(pending);
175 }
Markus Handellad5037b2021-05-07 15:02:36 +0200176 for (TimerEvent* timer : pending_timers_)
177 delete timer;
Markus Handell82da9322022-12-16 15:50:24 +0100178
179#if RTC_DCHECK_IS_ON
180 MutexLock lock(&pending_lock_);
181 RTC_DCHECK(pending_.empty());
182#endif
Markus Handellad5037b2021-05-07 15:02:36 +0200183 },
184 queue_name, rtc::ThreadAttributes().SetPriority(priority));
tommic06b1332016-05-14 11:31:40 -0700185}
186
Danil Chapovaloveb175242019-02-12 10:44:38 +0100187void TaskQueueLibevent::Delete() {
tommic06b1332016-05-14 11:31:40 -0700188 RTC_DCHECK(!IsCurrent());
189 struct timespec ts;
190 char message = kQuit;
191 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
192 // The queue is full, so we have no choice but to wait and retry.
193 RTC_CHECK_EQ(EAGAIN, errno);
194 ts.tv_sec = 0;
195 ts.tv_nsec = 1000000;
196 nanosleep(&ts, nullptr);
197 }
198
Markus Handellad5037b2021-05-07 15:02:36 +0200199 thread_.Finalize();
tommic06b1332016-05-14 11:31:40 -0700200
Danil Chapovaloveb175242019-02-12 10:44:38 +0100201 event_del(&wakeup_event_);
tommi8c80c6e2017-02-23 00:34:52 -0800202
203 IgnoreSigPipeSignalOnCurrentThread();
204
tommic06b1332016-05-14 11:31:40 -0700205 close(wakeup_pipe_in_);
206 close(wakeup_pipe_out_);
207 wakeup_pipe_in_ = -1;
208 wakeup_pipe_out_ = -1;
209
tommic06b1332016-05-14 11:31:40 -0700210 event_base_free(event_base_);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100211 delete this;
tommic06b1332016-05-14 11:31:40 -0700212}
213
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200214void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200215 {
Markus Handell18523c32020-07-08 17:55:58 +0200216 MutexLock lock(&pending_lock_);
Steve Anton9a83dd72020-01-09 11:03:25 -0800217 bool had_pending_tasks = !pending_.empty();
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200218 pending_.push_back(std::move(task));
Steve Anton9a83dd72020-01-09 11:03:25 -0800219
220 // Only write to the pipe if there were no pending tasks before this one
221 // since the thread could be sleeping. If there were already pending tasks
222 // then we know there's either a pending write in the pipe or the thread has
223 // not yet processed the pending tasks. In either case, the thread will
224 // eventually wake up and process all pending tasks including this one.
225 if (had_pending_tasks) {
226 return;
227 }
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200228 }
Steve Anton9a83dd72020-01-09 11:03:25 -0800229
230 // Note: This behvior outlined above ensures we never fill up the pipe write
231 // buffer since there will only ever be 1 byte pending.
232 char message = kRunTasks;
233 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
234 sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700235}
236
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200237void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
238 absl::AnyInvocable<void() &&> task,
239 TimeDelta delay) {
240 // libevent api is not thread safe by default, thus event_add need to be
241 // called on the `thread_`.
242 RTC_DCHECK(IsCurrent());
243
244 TimerEvent* timer = new TimerEvent(this, std::move(task));
245 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
246 timer);
247 pending_timers_.push_back(timer);
248 timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
249 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
250 event_add(&timer->ev, &tv);
251}
252
253void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
254 TimeDelta delay) {
tommic06b1332016-05-14 11:31:40 -0700255 if (IsCurrent()) {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200256 PostDelayedTaskOnTaskQueue(std::move(task), delay);
tommic06b1332016-05-14 11:31:40 -0700257 } else {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200258 int64_t posted_us = rtc::TimeMicros();
259 PostTask([posted_us, delay, task = std::move(task), this]() mutable {
260 // Compensate for the time that has passed since the posting.
261 TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
262 PostDelayedTaskOnTaskQueue(
263 std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
264 });
tommic06b1332016-05-14 11:31:40 -0700265 }
266}
267
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200268void TaskQueueLibevent::PostDelayedHighPrecisionTask(
269 absl::AnyInvocable<void() &&> task,
270 TimeDelta delay) {
271 PostDelayedTask(std::move(task), delay);
272}
273
tommic06b1332016-05-14 11:31:40 -0700274// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100275void TaskQueueLibevent::OnWakeup(int socket,
276 short flags, // NOLINT
277 void* context) {
278 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
279 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
tommic06b1332016-05-14 11:31:40 -0700280 char buf;
281 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
282 switch (buf) {
283 case kQuit:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100284 me->is_active_ = false;
285 event_base_loopbreak(me->event_base_);
tommic06b1332016-05-14 11:31:40 -0700286 break;
Steve Anton9a83dd72020-01-09 11:03:25 -0800287 case kRunTasks: {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200288 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
tommic06b1332016-05-14 11:31:40 -0700289 {
Markus Handell18523c32020-07-08 17:55:58 +0200290 MutexLock lock(&me->pending_lock_);
Steve Anton9a83dd72020-01-09 11:03:25 -0800291 tasks.swap(me->pending_);
tommic06b1332016-05-14 11:31:40 -0700292 }
Steve Anton9a83dd72020-01-09 11:03:25 -0800293 RTC_DCHECK(!tasks.empty());
294 for (auto& task : tasks) {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200295 std::move(task)();
296 // Prefer to delete the `task` before running the next one.
297 task = nullptr;
Steve Anton9a83dd72020-01-09 11:03:25 -0800298 }
tommic06b1332016-05-14 11:31:40 -0700299 break;
300 }
301 default:
Artem Titovd3251962021-11-15 16:57:07 +0100302 RTC_DCHECK_NOTREACHED();
tommic06b1332016-05-14 11:31:40 -0700303 break;
304 }
305}
306
307// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100308void TaskQueueLibevent::RunTimer(int fd,
309 short flags, // NOLINT
310 void* context) {
tommic06b1332016-05-14 11:31:40 -0700311 TimerEvent* timer = static_cast<TimerEvent*>(context);
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200312 std::move(timer->task)();
Danil Chapovaloveb175242019-02-12 10:44:38 +0100313 timer->task_queue->pending_timers_.remove(timer);
tommic06b1332016-05-14 11:31:40 -0700314 delete timer;
315}
316
Danil Chapovaloveb175242019-02-12 10:44:38 +0100317class TaskQueueLibeventFactory final : public TaskQueueFactory {
318 public:
319 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
320 absl::string_view name,
321 Priority priority) const override {
322 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
323 new TaskQueueLibevent(name,
324 TaskQueuePriorityToThreadPriority(priority)));
325 }
326};
327
328} // namespace
329
330std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200331 return std::make_unique<TaskQueueLibeventFactory>();
perkj650fdae2017-08-25 05:00:11 -0700332}
333
Danil Chapovaloveb175242019-02-12 10:44:38 +0100334} // namespace webrtc