blob: c5229b1fe25addd14b4fee55259d0b1665c48857 [file] [log] [blame]
tommic06b1332016-05-14 11:31:40 -07001/*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#ifndef WEBRTC_BASE_TASK_QUEUE_H_
12#define WEBRTC_BASE_TASK_QUEUE_H_
13
14#include <list>
15#include <memory>
16
17#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
18#include <dispatch/dispatch.h>
19#endif
20
21#include "webrtc/base/constructormagic.h"
22#include "webrtc/base/criticalsection.h"
23
24#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
25#include "webrtc/base/platform_thread.h"
26#endif
27
28#if defined(WEBRTC_BUILD_LIBEVENT)
tommi8c80c6e2017-02-23 00:34:52 -080029#include "webrtc/base/refcountedobject.h"
30#include "webrtc/base/scoped_ref_ptr.h"
31
tommic06b1332016-05-14 11:31:40 -070032struct event_base;
33struct event;
34#endif
35
36namespace rtc {
37
38// Base interface for asynchronously executed tasks.
39// The interface basically consists of a single function, Run(), that executes
40// on the target queue. For more details see the Run() method and TaskQueue.
41class QueuedTask {
42 public:
43 QueuedTask() {}
44 virtual ~QueuedTask() {}
45
46 // Main routine that will run when the task is executed on the desired queue.
47 // The task should return |true| to indicate that it should be deleted or
48 // |false| to indicate that the queue should consider ownership of the task
49 // having been transferred. Returning |false| can be useful if a task has
50 // re-posted itself to a different queue or is otherwise being re-used.
51 virtual bool Run() = 0;
52
53 private:
54 RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
55};
56
57// Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
58template <class Closure>
59class ClosureTask : public QueuedTask {
60 public:
61 explicit ClosureTask(const Closure& closure) : closure_(closure) {}
62
63 private:
64 bool Run() override {
65 closure_();
66 return true;
67 }
68
69 Closure closure_;
70};
71
72// Extends ClosureTask to also allow specifying cleanup code.
73// This is useful when using lambdas if guaranteeing cleanup, even if a task
74// was dropped (queue is too full), is required.
75template <class Closure, class Cleanup>
76class ClosureTaskWithCleanup : public ClosureTask<Closure> {
77 public:
78 ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup)
79 : ClosureTask<Closure>(closure), cleanup_(cleanup) {}
80 ~ClosureTaskWithCleanup() { cleanup_(); }
81
82 private:
83 Cleanup cleanup_;
84};
85
86// Convenience function to construct closures that can be passed directly
87// to methods that support std::unique_ptr<QueuedTask> but not template
88// based parameters.
89template <class Closure>
90static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) {
91 return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure));
92}
93
94template <class Closure, class Cleanup>
95static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
96 const Cleanup& cleanup) {
97 return std::unique_ptr<QueuedTask>(
98 new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup));
99}
100
101// Implements a task queue that asynchronously executes tasks in a way that
102// guarantees that they're executed in FIFO order and that tasks never overlap.
103// Tasks may always execute on the same worker thread and they may not.
104// To DCHECK that tasks are executing on a known task queue, use IsCurrent().
105//
106// Here are some usage examples:
107//
108// 1) Asynchronously running a lambda:
109//
110// class MyClass {
111// ...
112// TaskQueue queue_("MyQueue");
113// };
114//
115// void MyClass::StartWork() {
116// queue_.PostTask([]() { Work(); });
117// ...
118//
119// 2) Doing work asynchronously on a worker queue and providing a notification
120// callback on the current queue, when the work has been done:
121//
122// void MyClass::StartWorkAndLetMeKnowWhenDone(
123// std::unique_ptr<QueuedTask> callback) {
124// DCHECK(TaskQueue::Current()) << "Need to be running on a queue";
125// queue_.PostTaskAndReply([]() { Work(); }, std::move(callback));
126// }
127// ...
128// my_class->StartWorkAndLetMeKnowWhenDone(
129// NewClosure([]() { LOG(INFO) << "The work is done!";}));
130//
131// 3) Posting a custom task on a timer. The task posts itself again after
132// every running:
133//
134// class TimerTask : public QueuedTask {
135// public:
136// TimerTask() {}
137// private:
138// bool Run() override {
139// ++count_;
140// TaskQueue::Current()->PostDelayedTask(
141// std::unique_ptr<QueuedTask>(this), 1000);
142// // Ownership has been transferred to the next occurance,
143// // so return false to prevent from being deleted now.
144// return false;
145// }
146// int count_ = 0;
147// };
148// ...
149// queue_.PostDelayedTask(
150// std::unique_ptr<QueuedTask>(new TimerTask()), 1000);
151//
152// For more examples, see task_queue_unittests.cc.
153//
154// A note on destruction:
155//
156// When a TaskQueue is deleted, pending tasks will not be executed but they will
157// be deleted. The deletion of tasks may happen asynchronously after the
158// TaskQueue itself has been deleted or it may happen synchronously while the
159// TaskQueue instance is being deleted. This may vary from one OS to the next
160// so assumptions about lifetimes of pending tasks should not be made.
danilchap8e572f02016-05-19 06:49:03 -0700161class LOCKABLE TaskQueue {
tommic06b1332016-05-14 11:31:40 -0700162 public:
tommic9bb7912017-02-24 10:42:14 -0800163 // TaskQueue priority levels. On some platforms these will map to thread
164 // priorities, on others such as Mac and iOS, GCD queue priorities.
165 enum class Priority {
166 NORMAL = 0,
167 HIGH,
168 LOW,
169 };
170
171 explicit TaskQueue(const char* queue_name,
172 Priority priority = Priority::NORMAL);
tommic06b1332016-05-14 11:31:40 -0700173 ~TaskQueue();
174
175 static TaskQueue* Current();
176
177 // Used for DCHECKing the current queue.
178 static bool IsCurrent(const char* queue_name);
179 bool IsCurrent() const;
180
Taylor Brandstetter5d97a9a2016-06-10 14:17:27 -0700181 // TODO(tommi): For better debuggability, implement RTC_FROM_HERE.
tommic06b1332016-05-14 11:31:40 -0700182
183 // Ownership of the task is passed to PostTask.
184 void PostTask(std::unique_ptr<QueuedTask> task);
185 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
186 std::unique_ptr<QueuedTask> reply,
187 TaskQueue* reply_queue);
188 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
189 std::unique_ptr<QueuedTask> reply);
190
tommif9d91542017-02-17 02:47:11 -0800191 // Schedules a task to execute a specified number of milliseconds from when
192 // the call is made. The precision should be considered as "best effort"
193 // and in some cases, such as on Windows when all high precision timers have
194 // been used up, can be off by as much as 15 millseconds (although 8 would be
195 // more likely). This can be mitigated by limiting the use of delayed tasks.
tommic06b1332016-05-14 11:31:40 -0700196 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
197
198 template <class Closure>
199 void PostTask(const Closure& closure) {
200 PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
201 }
202
tommif9d91542017-02-17 02:47:11 -0800203 // See documentation above for performance expectations.
tommic06b1332016-05-14 11:31:40 -0700204 template <class Closure>
205 void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
206 PostDelayedTask(
207 std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
208 milliseconds);
209 }
210
211 template <class Closure1, class Closure2>
212 void PostTaskAndReply(const Closure1& task,
213 const Closure2& reply,
214 TaskQueue* reply_queue) {
215 PostTaskAndReply(
216 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
217 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
218 reply_queue);
219 }
220
221 template <class Closure>
222 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
223 const Closure& reply) {
224 PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>(
225 new ClosureTask<Closure>(reply)));
226 }
227
228 template <class Closure>
229 void PostTaskAndReply(const Closure& task,
230 std::unique_ptr<QueuedTask> reply) {
231 PostTaskAndReply(
232 std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)),
233 std::move(reply));
234 }
235
236 template <class Closure1, class Closure2>
237 void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
238 PostTaskAndReply(
239 std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
240 std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
241 }
242
243 private:
244#if defined(WEBRTC_BUILD_LIBEVENT)
tommi0f8b4032017-02-22 11:22:05 -0800245 static void ThreadMain(void* context);
tommic06b1332016-05-14 11:31:40 -0700246 static void OnWakeup(int socket, short flags, void* context); // NOLINT
247 static void RunTask(int fd, short flags, void* context); // NOLINT
248 static void RunTimer(int fd, short flags, void* context); // NOLINT
249
tommi8c80c6e2017-02-23 00:34:52 -0800250 class ReplyTaskOwner;
tommic06b1332016-05-14 11:31:40 -0700251 class PostAndReplyTask;
252 class SetTimerTask;
253
tommi8c80c6e2017-02-23 00:34:52 -0800254 typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
255
256 void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
tommic06b1332016-05-14 11:31:40 -0700257
258 struct QueueContext;
259
260 int wakeup_pipe_in_ = -1;
261 int wakeup_pipe_out_ = -1;
262 event_base* event_base_;
263 std::unique_ptr<event> wakeup_event_;
264 PlatformThread thread_;
265 rtc::CriticalSection pending_lock_;
266 std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
tommi8c80c6e2017-02-23 00:34:52 -0800267 std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
268 GUARDED_BY(pending_lock_);
tommic06b1332016-05-14 11:31:40 -0700269#elif defined(WEBRTC_MAC)
270 struct QueueContext;
271 struct TaskContext;
272 struct PostTaskAndReplyContext;
273 dispatch_queue_t queue_;
274 QueueContext* const context_;
275#elif defined(WEBRTC_WIN)
tommi0b942152017-03-10 09:33:53 -0800276 class ThreadState;
tommi0f8b4032017-02-22 11:22:05 -0800277 static void ThreadMain(void* context);
tommic06b1332016-05-14 11:31:40 -0700278
279 class WorkerThread : public PlatformThread {
280 public:
tommic9bb7912017-02-24 10:42:14 -0800281 WorkerThread(ThreadRunFunction func,
282 void* obj,
283 const char* thread_name,
284 ThreadPriority priority)
285 : PlatformThread(func, obj, thread_name, priority) {}
tommic06b1332016-05-14 11:31:40 -0700286
287 bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
288 return PlatformThread::QueueAPC(apc_function, data);
289 }
290 };
291 WorkerThread thread_;
292#else
293#error not supported.
294#endif
295
296 RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
297};
298
299} // namespace rtc
300
301#endif // WEBRTC_BASE_TASK_QUEUE_H_