blob: f50e5a63dfd8c100039976d17dc8bc68c6ed1827 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Danil Chapovaloveb175242019-02-12 10:44:38 +010011#include "rtc_base/task_queue_libevent.h"
tommic06b1332016-05-14 11:31:40 -070012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <errno.h>
tommic06b1332016-05-14 11:31:40 -070014#include <fcntl.h>
Yves Gerey988cc082018-10-23 12:03:01 +020015#include <pthread.h>
tommi8c80c6e2017-02-23 00:34:52 -080016#include <signal.h>
Yves Gerey988cc082018-10-23 12:03:01 +020017#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 11:31:40 -070019#include <unistd.h>
Jonas Olssona4d87372019-07-05 19:08:33 +020020
Danil Chapovalov02fddf62018-02-12 12:41:16 +010021#include <list>
Yves Gerey988cc082018-10-23 12:03:01 +020022#include <memory>
23#include <type_traits>
24#include <utility>
tommic06b1332016-05-14 11:31:40 -070025
Steve Anton9a83dd72020-01-09 11:03:25 -080026#include "absl/container/inlined_vector.h"
Danil Chapovalov30c2a312022-07-19 14:12:43 +020027#include "absl/functional/any_invocable.h"
Danil Chapovaloveb175242019-02-12 10:44:38 +010028#include "absl/strings/string_view.h"
Danil Chapovaloveb175242019-02-12 10:44:38 +010029#include "api/task_queue/task_queue_base.h"
Danil Chapovalov30c2a312022-07-19 14:12:43 +020030#include "api/units/time_delta.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020031#include "rtc_base/checks.h"
32#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 10:42:26 +010033#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020034#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 12:03:01 +020035#include "rtc_base/platform_thread_types.h"
Markus Handell18523c32020-07-08 17:55:58 +020036#include "rtc_base/synchronization/mutex.h"
Yves Gerey988cc082018-10-23 12:03:01 +020037#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 09:11:00 -080038#include "rtc_base/time_utils.h"
Byoungchan Leed69a7262022-06-23 22:06:00 +090039#include "third_party/libevent/event.h"
tommic06b1332016-05-14 11:31:40 -070040
Danil Chapovaloveb175242019-02-12 10:44:38 +010041namespace webrtc {
tommic06b1332016-05-14 11:31:40 -070042namespace {
Danil Chapovaloveb175242019-02-12 10:44:38 +010043constexpr char kQuit = 1;
Steve Anton9a83dd72020-01-09 11:03:25 -080044constexpr char kRunTasks = 2;
tommi8c80c6e2017-02-23 00:34:52 -080045
Danil Chapovaloveb175242019-02-12 10:44:38 +010046using Priority = TaskQueueFactory::Priority;
tommic9bb7912017-02-24 10:42:14 -080047
tommi8c80c6e2017-02-23 00:34:52 -080048// This ignores the SIGPIPE signal on the calling thread.
49// This signal can be fired when trying to write() to a pipe that's being
50// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 15:46:58 +010051// We can run into that situation so we ignore this signal and continue as
52// normal.
tommi8c80c6e2017-02-23 00:34:52 -080053// As a side note for this implementation, it would be great if we could safely
54// restore the sigmask, but unfortunately the operation of restoring it, can
55// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
56// The SIGPIPE signal by default causes the process to be terminated, so we
57// don't want to risk that.
58// An alternative to this approach is to ignore the signal for the whole
59// process:
60// signal(SIGPIPE, SIG_IGN);
61void IgnoreSigPipeSignalOnCurrentThread() {
62 sigset_t sigpipe_mask;
63 sigemptyset(&sigpipe_mask);
64 sigaddset(&sigpipe_mask, SIGPIPE);
65 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
66}
tommic06b1332016-05-14 11:31:40 -070067
tommic06b1332016-05-14 11:31:40 -070068bool SetNonBlocking(int fd) {
69 const int flags = fcntl(fd, F_GETFL);
70 RTC_CHECK(flags != -1);
71 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
72}
tommi1666b612016-07-13 10:58:12 -070073
74// TODO(tommi): This is a hack to support two versions of libevent that we're
75// compatible with. The method we really want to call is event_assign(),
76// since event_set() has been marked as deprecated (and doesn't accept
77// passing event_base__ as a parameter). However, the version of libevent
78// that we have in Chromium, doesn't have event_assign(), so we need to call
79// event_set() there.
80void EventAssign(struct event* ev,
81 struct event_base* base,
82 int fd,
83 short events,
84 void (*callback)(int, short, void*),
85 void* arg) {
86#if defined(_EVENT2_EVENT_H_)
87 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
88#else
89 event_set(ev, fd, events, callback, arg);
90 RTC_CHECK_EQ(0, event_base_set(base, ev));
91#endif
92}
tommic9bb7912017-02-24 10:42:14 -080093
Danil Chapovaloveb175242019-02-12 10:44:38 +010094rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
tommic9bb7912017-02-24 10:42:14 -080095 switch (priority) {
96 case Priority::HIGH:
Markus Handellad5037b2021-05-07 15:02:36 +020097 return rtc::ThreadPriority::kRealtime;
tommic9bb7912017-02-24 10:42:14 -080098 case Priority::LOW:
Markus Handellad5037b2021-05-07 15:02:36 +020099 return rtc::ThreadPriority::kLow;
tommic9bb7912017-02-24 10:42:14 -0800100 case Priority::NORMAL:
Markus Handellad5037b2021-05-07 15:02:36 +0200101 return rtc::ThreadPriority::kNormal;
tommic9bb7912017-02-24 10:42:14 -0800102 }
tommic9bb7912017-02-24 10:42:14 -0800103}
tommic06b1332016-05-14 11:31:40 -0700104
Danil Chapovaloveb175242019-02-12 10:44:38 +0100105class TaskQueueLibevent final : public TaskQueueBase {
perkj650fdae2017-08-25 05:00:11 -0700106 public:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100107 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
perkj650fdae2017-08-25 05:00:11 -0700108
Danil Chapovaloveb175242019-02-12 10:44:38 +0100109 void Delete() override;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200110 void PostTask(absl::AnyInvocable<void() &&> task) override;
111 void PostDelayedTask(absl::AnyInvocable<void() &&> task,
112 TimeDelta delay) override;
113 void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
114 TimeDelta delay) override;
perkj650fdae2017-08-25 05:00:11 -0700115
116 private:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100117 struct TimerEvent;
118
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200119 void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
120 TimeDelta delay);
121
Danil Chapovaloveb175242019-02-12 10:44:38 +0100122 ~TaskQueueLibevent() override = default;
123
perkj650fdae2017-08-25 05:00:11 -0700124 static void OnWakeup(int socket, short flags, void* context); // NOLINT
perkj650fdae2017-08-25 05:00:11 -0700125 static void RunTimer(int fd, short flags, void* context); // NOLINT
126
Danil Chapovaloveb175242019-02-12 10:44:38 +0100127 bool is_active_ = true;
perkj650fdae2017-08-25 05:00:11 -0700128 int wakeup_pipe_in_ = -1;
129 int wakeup_pipe_out_ = -1;
130 event_base* event_base_;
Danil Chapovaloveb175242019-02-12 10:44:38 +0100131 event wakeup_event_;
132 rtc::PlatformThread thread_;
Markus Handell18523c32020-07-08 17:55:58 +0200133 Mutex pending_lock_;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200134 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
Steve Anton9a83dd72020-01-09 11:03:25 -0800135 RTC_GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 11:31:40 -0700136 // Holds a list of events pending timers for cleanup when the loop exits.
137 std::list<TimerEvent*> pending_timers_;
138};
139
Danil Chapovaloveb175242019-02-12 10:44:38 +0100140struct TaskQueueLibevent::TimerEvent {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200141 TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
Danil Chapovaloveb175242019-02-12 10:44:38 +0100142 : task_queue(task_queue), task(std::move(task)) {}
143 ~TimerEvent() { event_del(&ev); }
144
145 event ev;
146 TaskQueueLibevent* task_queue;
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200147 absl::AnyInvocable<void() &&> task;
tommic06b1332016-05-14 11:31:40 -0700148};
149
Danil Chapovaloveb175242019-02-12 10:44:38 +0100150TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
151 rtc::ThreadPriority priority)
Markus Handellad5037b2021-05-07 15:02:36 +0200152 : event_base_(event_base_new()) {
tommic06b1332016-05-14 11:31:40 -0700153 int fds[2];
154 RTC_CHECK(pipe(fds) == 0);
155 SetNonBlocking(fds[0]);
156 SetNonBlocking(fds[1]);
157 wakeup_pipe_out_ = fds[0];
158 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800159
Danil Chapovaloveb175242019-02-12 10:44:38 +0100160 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
tommi1666b612016-07-13 10:58:12 -0700161 EV_READ | EV_PERSIST, OnWakeup, this);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100162 event_add(&wakeup_event_, 0);
Markus Handellad5037b2021-05-07 15:02:36 +0200163 thread_ = rtc::PlatformThread::SpawnJoinable(
164 [this] {
165 {
166 CurrentTaskQueueSetter set_current(this);
167 while (is_active_)
168 event_base_loop(event_base_, 0);
169 }
170
171 for (TimerEvent* timer : pending_timers_)
172 delete timer;
173 },
174 queue_name, rtc::ThreadAttributes().SetPriority(priority));
tommic06b1332016-05-14 11:31:40 -0700175}
176
Danil Chapovaloveb175242019-02-12 10:44:38 +0100177void TaskQueueLibevent::Delete() {
tommic06b1332016-05-14 11:31:40 -0700178 RTC_DCHECK(!IsCurrent());
179 struct timespec ts;
180 char message = kQuit;
181 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
182 // The queue is full, so we have no choice but to wait and retry.
183 RTC_CHECK_EQ(EAGAIN, errno);
184 ts.tv_sec = 0;
185 ts.tv_nsec = 1000000;
186 nanosleep(&ts, nullptr);
187 }
188
Markus Handellad5037b2021-05-07 15:02:36 +0200189 thread_.Finalize();
tommic06b1332016-05-14 11:31:40 -0700190
Danil Chapovaloveb175242019-02-12 10:44:38 +0100191 event_del(&wakeup_event_);
tommi8c80c6e2017-02-23 00:34:52 -0800192
193 IgnoreSigPipeSignalOnCurrentThread();
194
tommic06b1332016-05-14 11:31:40 -0700195 close(wakeup_pipe_in_);
196 close(wakeup_pipe_out_);
197 wakeup_pipe_in_ = -1;
198 wakeup_pipe_out_ = -1;
199
tommic06b1332016-05-14 11:31:40 -0700200 event_base_free(event_base_);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100201 delete this;
tommic06b1332016-05-14 11:31:40 -0700202}
203
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200204void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200205 {
Markus Handell18523c32020-07-08 17:55:58 +0200206 MutexLock lock(&pending_lock_);
Steve Anton9a83dd72020-01-09 11:03:25 -0800207 bool had_pending_tasks = !pending_.empty();
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200208 pending_.push_back(std::move(task));
Steve Anton9a83dd72020-01-09 11:03:25 -0800209
210 // Only write to the pipe if there were no pending tasks before this one
211 // since the thread could be sleeping. If there were already pending tasks
212 // then we know there's either a pending write in the pipe or the thread has
213 // not yet processed the pending tasks. In either case, the thread will
214 // eventually wake up and process all pending tasks including this one.
215 if (had_pending_tasks) {
216 return;
217 }
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200218 }
Steve Anton9a83dd72020-01-09 11:03:25 -0800219
220 // Note: This behvior outlined above ensures we never fill up the pipe write
221 // buffer since there will only ever be 1 byte pending.
222 char message = kRunTasks;
223 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
224 sizeof(message));
tommic06b1332016-05-14 11:31:40 -0700225}
226
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200227void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
228 absl::AnyInvocable<void() &&> task,
229 TimeDelta delay) {
230 // libevent api is not thread safe by default, thus event_add need to be
231 // called on the `thread_`.
232 RTC_DCHECK(IsCurrent());
233
234 TimerEvent* timer = new TimerEvent(this, std::move(task));
235 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
236 timer);
237 pending_timers_.push_back(timer);
238 timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
239 .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
240 event_add(&timer->ev, &tv);
241}
242
243void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
244 TimeDelta delay) {
tommic06b1332016-05-14 11:31:40 -0700245 if (IsCurrent()) {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200246 PostDelayedTaskOnTaskQueue(std::move(task), delay);
tommic06b1332016-05-14 11:31:40 -0700247 } else {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200248 int64_t posted_us = rtc::TimeMicros();
249 PostTask([posted_us, delay, task = std::move(task), this]() mutable {
250 // Compensate for the time that has passed since the posting.
251 TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
252 PostDelayedTaskOnTaskQueue(
253 std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
254 });
tommic06b1332016-05-14 11:31:40 -0700255 }
256}
257
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200258void TaskQueueLibevent::PostDelayedHighPrecisionTask(
259 absl::AnyInvocable<void() &&> task,
260 TimeDelta delay) {
261 PostDelayedTask(std::move(task), delay);
262}
263
tommic06b1332016-05-14 11:31:40 -0700264// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100265void TaskQueueLibevent::OnWakeup(int socket,
266 short flags, // NOLINT
267 void* context) {
268 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
269 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
tommic06b1332016-05-14 11:31:40 -0700270 char buf;
271 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
272 switch (buf) {
273 case kQuit:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100274 me->is_active_ = false;
275 event_base_loopbreak(me->event_base_);
tommic06b1332016-05-14 11:31:40 -0700276 break;
Steve Anton9a83dd72020-01-09 11:03:25 -0800277 case kRunTasks: {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200278 absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
tommic06b1332016-05-14 11:31:40 -0700279 {
Markus Handell18523c32020-07-08 17:55:58 +0200280 MutexLock lock(&me->pending_lock_);
Steve Anton9a83dd72020-01-09 11:03:25 -0800281 tasks.swap(me->pending_);
tommic06b1332016-05-14 11:31:40 -0700282 }
Steve Anton9a83dd72020-01-09 11:03:25 -0800283 RTC_DCHECK(!tasks.empty());
284 for (auto& task : tasks) {
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200285 std::move(task)();
286 // Prefer to delete the `task` before running the next one.
287 task = nullptr;
Steve Anton9a83dd72020-01-09 11:03:25 -0800288 }
tommic06b1332016-05-14 11:31:40 -0700289 break;
290 }
291 default:
Artem Titovd3251962021-11-15 16:57:07 +0100292 RTC_DCHECK_NOTREACHED();
tommic06b1332016-05-14 11:31:40 -0700293 break;
294 }
295}
296
297// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100298void TaskQueueLibevent::RunTimer(int fd,
299 short flags, // NOLINT
300 void* context) {
tommic06b1332016-05-14 11:31:40 -0700301 TimerEvent* timer = static_cast<TimerEvent*>(context);
Danil Chapovalov30c2a312022-07-19 14:12:43 +0200302 std::move(timer->task)();
Danil Chapovaloveb175242019-02-12 10:44:38 +0100303 timer->task_queue->pending_timers_.remove(timer);
tommic06b1332016-05-14 11:31:40 -0700304 delete timer;
305}
306
Danil Chapovaloveb175242019-02-12 10:44:38 +0100307class TaskQueueLibeventFactory final : public TaskQueueFactory {
308 public:
309 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
310 absl::string_view name,
311 Priority priority) const override {
312 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
313 new TaskQueueLibevent(name,
314 TaskQueuePriorityToThreadPriority(priority)));
315 }
316};
317
318} // namespace
319
320std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200321 return std::make_unique<TaskQueueLibeventFactory>();
perkj650fdae2017-08-25 05:00:11 -0700322}
323
Danil Chapovaloveb175242019-02-12 10:44:38 +0100324} // namespace webrtc