blob: 7638869bbc8d41d07a2d4fcaaf17044cc7c230a9 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Danil Chapovaloveb175242019-02-12 10:44:38 +010011#include "rtc_base/task_queue_libevent.h"
tommic06b1332016-05-14 11:31:40 -070012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <errno.h>
tommic06b1332016-05-14 11:31:40 -070014#include <fcntl.h>
Yves Gerey988cc082018-10-23 12:03:01 +020015#include <pthread.h>
tommi8c80c6e2017-02-23 00:34:52 -080016#include <signal.h>
Yves Gerey988cc082018-10-23 12:03:01 +020017#include <stdint.h>
18#include <time.h>
tommic06b1332016-05-14 11:31:40 -070019#include <unistd.h>
Jonas Olssona4d87372019-07-05 19:08:33 +020020
Danil Chapovalov02fddf62018-02-12 12:41:16 +010021#include <list>
Yves Gerey988cc082018-10-23 12:03:01 +020022#include <memory>
23#include <type_traits>
24#include <utility>
tommic06b1332016-05-14 11:31:40 -070025
Danil Chapovaloveb175242019-02-12 10:44:38 +010026#include "absl/strings/string_view.h"
27#include "api/task_queue/queued_task.h"
28#include "api/task_queue/task_queue_base.h"
tommic06b1332016-05-14 11:31:40 -070029#include "base/third_party/libevent/event.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020030#include "rtc_base/checks.h"
Steve Anton10542f22019-01-11 09:11:00 -080031#include "rtc_base/critical_section.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020032#include "rtc_base/logging.h"
Karl Wiberge40468b2017-11-22 10:42:26 +010033#include "rtc_base/numerics/safe_conversions.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020034#include "rtc_base/platform_thread.h"
Yves Gerey988cc082018-10-23 12:03:01 +020035#include "rtc_base/platform_thread_types.h"
Yves Gerey988cc082018-10-23 12:03:01 +020036#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 09:11:00 -080037#include "rtc_base/time_utils.h"
tommic06b1332016-05-14 11:31:40 -070038
Danil Chapovaloveb175242019-02-12 10:44:38 +010039namespace webrtc {
tommic06b1332016-05-14 11:31:40 -070040namespace {
Danil Chapovaloveb175242019-02-12 10:44:38 +010041constexpr char kQuit = 1;
42constexpr char kRunTask = 2;
tommi8c80c6e2017-02-23 00:34:52 -080043
Danil Chapovaloveb175242019-02-12 10:44:38 +010044using Priority = TaskQueueFactory::Priority;
tommic9bb7912017-02-24 10:42:14 -080045
tommi8c80c6e2017-02-23 00:34:52 -080046// This ignores the SIGPIPE signal on the calling thread.
47// This signal can be fired when trying to write() to a pipe that's being
48// closed or while closing a pipe that's being written to.
Danil Chapovalov43f39822018-12-05 15:46:58 +010049// We can run into that situation so we ignore this signal and continue as
50// normal.
tommi8c80c6e2017-02-23 00:34:52 -080051// As a side note for this implementation, it would be great if we could safely
52// restore the sigmask, but unfortunately the operation of restoring it, can
53// itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
54// The SIGPIPE signal by default causes the process to be terminated, so we
55// don't want to risk that.
56// An alternative to this approach is to ignore the signal for the whole
57// process:
58// signal(SIGPIPE, SIG_IGN);
59void IgnoreSigPipeSignalOnCurrentThread() {
60 sigset_t sigpipe_mask;
61 sigemptyset(&sigpipe_mask);
62 sigaddset(&sigpipe_mask, SIGPIPE);
63 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
64}
tommic06b1332016-05-14 11:31:40 -070065
tommic06b1332016-05-14 11:31:40 -070066bool SetNonBlocking(int fd) {
67 const int flags = fcntl(fd, F_GETFL);
68 RTC_CHECK(flags != -1);
69 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
70}
tommi1666b612016-07-13 10:58:12 -070071
72// TODO(tommi): This is a hack to support two versions of libevent that we're
73// compatible with. The method we really want to call is event_assign(),
74// since event_set() has been marked as deprecated (and doesn't accept
75// passing event_base__ as a parameter). However, the version of libevent
76// that we have in Chromium, doesn't have event_assign(), so we need to call
77// event_set() there.
78void EventAssign(struct event* ev,
79 struct event_base* base,
80 int fd,
81 short events,
82 void (*callback)(int, short, void*),
83 void* arg) {
84#if defined(_EVENT2_EVENT_H_)
85 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
86#else
87 event_set(ev, fd, events, callback, arg);
88 RTC_CHECK_EQ(0, event_base_set(base, ev));
89#endif
90}
tommic9bb7912017-02-24 10:42:14 -080091
Danil Chapovaloveb175242019-02-12 10:44:38 +010092rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
tommic9bb7912017-02-24 10:42:14 -080093 switch (priority) {
94 case Priority::HIGH:
Danil Chapovaloveb175242019-02-12 10:44:38 +010095 return rtc::kRealtimePriority;
tommic9bb7912017-02-24 10:42:14 -080096 case Priority::LOW:
Danil Chapovaloveb175242019-02-12 10:44:38 +010097 return rtc::kLowPriority;
tommic9bb7912017-02-24 10:42:14 -080098 case Priority::NORMAL:
Danil Chapovaloveb175242019-02-12 10:44:38 +010099 return rtc::kNormalPriority;
tommic9bb7912017-02-24 10:42:14 -0800100 default:
101 RTC_NOTREACHED();
102 break;
103 }
Danil Chapovaloveb175242019-02-12 10:44:38 +0100104 return rtc::kNormalPriority;
tommic9bb7912017-02-24 10:42:14 -0800105}
tommic06b1332016-05-14 11:31:40 -0700106
Danil Chapovaloveb175242019-02-12 10:44:38 +0100107class TaskQueueLibevent final : public TaskQueueBase {
perkj650fdae2017-08-25 05:00:11 -0700108 public:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100109 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
perkj650fdae2017-08-25 05:00:11 -0700110
Danil Chapovaloveb175242019-02-12 10:44:38 +0100111 void Delete() override;
112 void PostTask(std::unique_ptr<QueuedTask> task) override;
113 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
114 uint32_t milliseconds) override;
perkj650fdae2017-08-25 05:00:11 -0700115
116 private:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100117 class SetTimerTask;
118 struct TimerEvent;
119
120 ~TaskQueueLibevent() override = default;
121
perkj650fdae2017-08-25 05:00:11 -0700122 static void ThreadMain(void* context);
123 static void OnWakeup(int socket, short flags, void* context); // NOLINT
perkj650fdae2017-08-25 05:00:11 -0700124 static void RunTimer(int fd, short flags, void* context); // NOLINT
125
Danil Chapovaloveb175242019-02-12 10:44:38 +0100126 bool is_active_ = true;
perkj650fdae2017-08-25 05:00:11 -0700127 int wakeup_pipe_in_ = -1;
128 int wakeup_pipe_out_ = -1;
129 event_base* event_base_;
Danil Chapovaloveb175242019-02-12 10:44:38 +0100130 event wakeup_event_;
131 rtc::PlatformThread thread_;
perkj650fdae2017-08-25 05:00:11 -0700132 rtc::CriticalSection pending_lock_;
danilchap3c6abd22017-09-06 05:46:29 -0700133 std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 11:31:40 -0700134 // Holds a list of events pending timers for cleanup when the loop exits.
135 std::list<TimerEvent*> pending_timers_;
136};
137
Danil Chapovaloveb175242019-02-12 10:44:38 +0100138struct TaskQueueLibevent::TimerEvent {
139 TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
140 : task_queue(task_queue), task(std::move(task)) {}
141 ~TimerEvent() { event_del(&ev); }
142
143 event ev;
144 TaskQueueLibevent* task_queue;
145 std::unique_ptr<QueuedTask> task;
146};
147
148class TaskQueueLibevent::SetTimerTask : public QueuedTask {
tommic06b1332016-05-14 11:31:40 -0700149 public:
150 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
151 : task_(std::move(task)),
152 milliseconds_(milliseconds),
Danil Chapovaloveb175242019-02-12 10:44:38 +0100153 posted_(rtc::Time32()) {}
tommic06b1332016-05-14 11:31:40 -0700154
155 private:
156 bool Run() override {
157 // Compensate for the time that has passed since construction
158 // and until we got here.
Danil Chapovaloveb175242019-02-12 10:44:38 +0100159 uint32_t post_time = rtc::Time32() - posted_;
160 TaskQueueLibevent::Current()->PostDelayedTask(
tommic06b1332016-05-14 11:31:40 -0700161 std::move(task_),
162 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
163 return true;
164 }
165
166 std::unique_ptr<QueuedTask> task_;
167 const uint32_t milliseconds_;
168 const uint32_t posted_;
169};
170
Danil Chapovaloveb175242019-02-12 10:44:38 +0100171TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
172 rtc::ThreadPriority priority)
173 : event_base_(event_base_new()),
174 thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
tommic06b1332016-05-14 11:31:40 -0700175 int fds[2];
176 RTC_CHECK(pipe(fds) == 0);
177 SetNonBlocking(fds[0]);
178 SetNonBlocking(fds[1]);
179 wakeup_pipe_out_ = fds[0];
180 wakeup_pipe_in_ = fds[1];
tommi8c80c6e2017-02-23 00:34:52 -0800181
Danil Chapovaloveb175242019-02-12 10:44:38 +0100182 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
tommi1666b612016-07-13 10:58:12 -0700183 EV_READ | EV_PERSIST, OnWakeup, this);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100184 event_add(&wakeup_event_, 0);
tommic06b1332016-05-14 11:31:40 -0700185 thread_.Start();
186}
187
Danil Chapovaloveb175242019-02-12 10:44:38 +0100188void TaskQueueLibevent::Delete() {
tommic06b1332016-05-14 11:31:40 -0700189 RTC_DCHECK(!IsCurrent());
190 struct timespec ts;
191 char message = kQuit;
192 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
193 // The queue is full, so we have no choice but to wait and retry.
194 RTC_CHECK_EQ(EAGAIN, errno);
195 ts.tv_sec = 0;
196 ts.tv_nsec = 1000000;
197 nanosleep(&ts, nullptr);
198 }
199
200 thread_.Stop();
201
Danil Chapovaloveb175242019-02-12 10:44:38 +0100202 event_del(&wakeup_event_);
tommi8c80c6e2017-02-23 00:34:52 -0800203
204 IgnoreSigPipeSignalOnCurrentThread();
205
tommic06b1332016-05-14 11:31:40 -0700206 close(wakeup_pipe_in_);
207 close(wakeup_pipe_out_);
208 wakeup_pipe_in_ = -1;
209 wakeup_pipe_out_ = -1;
210
tommic06b1332016-05-14 11:31:40 -0700211 event_base_free(event_base_);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100212 delete this;
tommic06b1332016-05-14 11:31:40 -0700213}
214
Danil Chapovaloveb175242019-02-12 10:44:38 +0100215void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
Danil Chapovalov00e71ef2019-06-11 18:01:56 +0200216 QueuedTask* task_id = task.get(); // Only used for comparison.
217 {
218 rtc::CritScope lock(&pending_lock_);
219 pending_.push_back(std::move(task));
220 }
221 char message = kRunTask;
222 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
223 RTC_LOG(WARNING) << "Failed to queue task.";
224 rtc::CritScope lock(&pending_lock_);
225 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
226 return t.get() == task_id;
227 });
tommic06b1332016-05-14 11:31:40 -0700228 }
229}
230
Danil Chapovaloveb175242019-02-12 10:44:38 +0100231void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
232 uint32_t milliseconds) {
tommic06b1332016-05-14 11:31:40 -0700233 if (IsCurrent()) {
Danil Chapovaloveb175242019-02-12 10:44:38 +0100234 TimerEvent* timer = new TimerEvent(this, std::move(task));
235 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
perkj650fdae2017-08-25 05:00:11 -0700236 timer);
Danil Chapovaloveb175242019-02-12 10:44:38 +0100237 pending_timers_.push_back(timer);
kwiberg5b9746e2017-08-16 04:52:35 -0700238 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
239 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
tommic06b1332016-05-14 11:31:40 -0700240 event_add(&timer->ev, &tv);
241 } else {
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200242 PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
tommic06b1332016-05-14 11:31:40 -0700243 }
244}
245
tommic06b1332016-05-14 11:31:40 -0700246// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100247void TaskQueueLibevent::ThreadMain(void* context) {
248 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
tommic06b1332016-05-14 11:31:40 -0700249
Danil Chapovaloveb175242019-02-12 10:44:38 +0100250 {
251 CurrentTaskQueueSetter set_current(me);
252 while (me->is_active_)
253 event_base_loop(me->event_base_, 0);
254 }
tommic06b1332016-05-14 11:31:40 -0700255
Danil Chapovaloveb175242019-02-12 10:44:38 +0100256 for (TimerEvent* timer : me->pending_timers_)
tommic06b1332016-05-14 11:31:40 -0700257 delete timer;
tommic06b1332016-05-14 11:31:40 -0700258}
259
260// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100261void TaskQueueLibevent::OnWakeup(int socket,
262 short flags, // NOLINT
263 void* context) {
264 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
265 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
tommic06b1332016-05-14 11:31:40 -0700266 char buf;
267 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
268 switch (buf) {
269 case kQuit:
Danil Chapovaloveb175242019-02-12 10:44:38 +0100270 me->is_active_ = false;
271 event_base_loopbreak(me->event_base_);
tommic06b1332016-05-14 11:31:40 -0700272 break;
273 case kRunTask: {
274 std::unique_ptr<QueuedTask> task;
275 {
Danil Chapovaloveb175242019-02-12 10:44:38 +0100276 rtc::CritScope lock(&me->pending_lock_);
277 RTC_DCHECK(!me->pending_.empty());
278 task = std::move(me->pending_.front());
279 me->pending_.pop_front();
tommic06b1332016-05-14 11:31:40 -0700280 RTC_DCHECK(task.get());
281 }
282 if (!task->Run())
283 task.release();
284 break;
285 }
286 default:
287 RTC_NOTREACHED();
288 break;
289 }
290}
291
292// static
Danil Chapovaloveb175242019-02-12 10:44:38 +0100293void TaskQueueLibevent::RunTimer(int fd,
294 short flags, // NOLINT
295 void* context) {
tommic06b1332016-05-14 11:31:40 -0700296 TimerEvent* timer = static_cast<TimerEvent*>(context);
297 if (!timer->task->Run())
298 timer->task.release();
Danil Chapovaloveb175242019-02-12 10:44:38 +0100299 timer->task_queue->pending_timers_.remove(timer);
tommic06b1332016-05-14 11:31:40 -0700300 delete timer;
301}
302
Danil Chapovaloveb175242019-02-12 10:44:38 +0100303class TaskQueueLibeventFactory final : public TaskQueueFactory {
304 public:
305 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
306 absl::string_view name,
307 Priority priority) const override {
308 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
309 new TaskQueueLibevent(name,
310 TaskQueuePriorityToThreadPriority(priority)));
311 }
312};
313
314} // namespace
315
316std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
Mirko Bonadei317a1f02019-09-17 17:06:18 +0200317 return std::make_unique<TaskQueueLibeventFactory>();
perkj650fdae2017-08-25 05:00:11 -0700318}
319
Danil Chapovaloveb175242019-02-12 10:44:38 +0100320} // namespace webrtc