blob: 73fc23400b237cda4317d3d324152462f63ed72d [file] [log] [blame]
niklase@google.com470e71d2011-07-07 08:21:25 +00001/*
xians@webrtc.org6bde7a82012-02-20 08:39:25 +00002 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
niklase@google.com470e71d2011-07-07 08:21:25 +00003 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020011#include "modules/utility/source/process_thread_impl.h"
asapersson@webrtc.org8b2ec152014-04-11 07:59:43 +000012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <string>
14
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020015#include "modules/include/module.h"
16#include "rtc_base/checks.h"
Danil Chapovalov14273de2020-02-27 13:37:43 +010017#include "rtc_base/logging.h"
Steve Anton10542f22019-01-11 09:11:00 -080018#include "rtc_base/time_utils.h"
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020019#include "rtc_base/trace_event.h"
niklase@google.com470e71d2011-07-07 08:21:25 +000020
21namespace webrtc {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000022namespace {
tommi@webrtc.org3985f012015-02-27 13:36:34 +000023
24// We use this constant internally to signal that a module has requested
25// a callback right away. When this is set, no call to TimeUntilNextProcess
26// should be made, but Process() should be called directly.
27const int64_t kCallProcessImmediately = -1;
28
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000029int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
30 int64_t interval = module->TimeUntilNextProcess();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000031 if (interval < 0) {
pbos9e260f12015-08-20 01:23:48 -070032 // Falling behind, we should call the callback now.
33 return time_now;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000034 }
35 return time_now + interval;
36}
Yves Gerey665174f2018-06-19 15:03:05 +020037} // namespace
niklase@google.com470e71d2011-07-07 08:21:25 +000038
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000039ProcessThread::~ProcessThread() {}
niklase@google.com470e71d2011-07-07 08:21:25 +000040
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000041// static
Yves Gerey665174f2018-06-19 15:03:05 +020042std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
kwiberg84be5112016-04-27 01:19:58 -070043 return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
niklase@google.com470e71d2011-07-07 08:21:25 +000044}
45
stefan847855b2015-09-11 09:52:15 -070046ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
Niels Möllerc572ff32018-11-07 08:43:50 +010047 : stop_(false), thread_name_(thread_name) {}
niklase@google.com470e71d2011-07-07 08:21:25 +000048
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000049ProcessThreadImpl::~ProcessThreadImpl() {
Sebastian Janssonc01367d2019-04-08 15:20:44 +020050 RTC_DCHECK(thread_checker_.IsCurrent());
henrikg91d6ede2015-09-17 00:24:34 -070051 RTC_DCHECK(!stop_);
tommi@webrtc.org03054482015-03-05 13:13:42 +000052
Danil Chapovalov14273de2020-02-27 13:37:43 +010053 while (!delayed_tasks_.empty()) {
54 delete delayed_tasks_.top().task;
55 delayed_tasks_.pop();
56 }
57
tommi@webrtc.org03054482015-03-05 13:13:42 +000058 while (!queue_.empty()) {
59 delete queue_.front();
60 queue_.pop();
61 }
niklase@google.com470e71d2011-07-07 08:21:25 +000062}
63
Danil Chapovalov14273de2020-02-27 13:37:43 +010064void ProcessThreadImpl::Delete() {
65 RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
66 << " is destroyed as a TaskQueue.";
67 Stop();
68 delete this;
69}
70
Niels Möller2bfddf72021-02-22 10:36:29 +010071// Doesn't need locking, because the contending thread isn't running.
72void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
Sebastian Janssonc01367d2019-04-08 15:20:44 +020073 RTC_DCHECK(thread_checker_.IsCurrent());
Markus Handellad5037b2021-05-07 15:02:36 +020074 RTC_DCHECK(thread_.empty());
75 if (!thread_.empty())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000076 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000077
henrikg91d6ede2015-09-17 00:24:34 -070078 RTC_DCHECK(!stop_);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000079
tommi0a2391f2017-03-21 02:31:51 -070080 for (ModuleCallback& m : modules_)
81 m.module->ProcessThreadAttached(this);
tommi@webrtc.org3985f012015-02-27 13:36:34 +000082
Markus Handellad5037b2021-05-07 15:02:36 +020083 thread_ = rtc::PlatformThread::SpawnJoinable(
84 [this] {
85 CurrentTaskQueueSetter set_current(this);
86 while (Process()) {
87 }
88 },
89 thread_name_);
niklase@google.com470e71d2011-07-07 08:21:25 +000090}
91
tommi@webrtc.org3985f012015-02-27 13:36:34 +000092void ProcessThreadImpl::Stop() {
Sebastian Janssonc01367d2019-04-08 15:20:44 +020093 RTC_DCHECK(thread_checker_.IsCurrent());
Markus Handellad5037b2021-05-07 15:02:36 +020094 if (thread_.empty())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000095 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000096
97 {
Niels Möller2bfddf72021-02-22 10:36:29 +010098 // Need to take lock, for synchronization with `thread_`.
Niels Möller47854022021-03-01 11:31:33 +010099 MutexLock lock(&mutex_);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000100 stop_ = true;
101 }
102
Niels Möller2c16cc62018-10-29 09:47:51 +0100103 wake_up_.Set();
Markus Handellad5037b2021-05-07 15:02:36 +0200104 thread_.Finalize();
Niels Möller2bfddf72021-02-22 10:36:29 +0100105
106 StopNoLocks();
107}
108
109// No locking needed, since this is called after the contending thread is
110// stopped.
111void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
Markus Handellad5037b2021-05-07 15:02:36 +0200112 RTC_DCHECK(thread_.empty());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000113 stop_ = false;
114
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000115 for (ModuleCallback& m : modules_)
116 m.module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000117}
118
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000119void ProcessThreadImpl::WakeUp(Module* module) {
120 // Allowed to be called on any thread.
Niels Möller47854022021-03-01 11:31:33 +0100121 auto holds_mutex = [this] {
122 if (!IsCurrent()) {
123 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000124 }
Niels Möller47854022021-03-01 11:31:33 +0100125 RTC_DCHECK_RUN_ON(this);
126 return holds_mutex_;
127 };
128 if (holds_mutex()) {
129 // Avoid locking if called on the ProcessThread, via a module's Process),
130 WakeUpNoLocks(module);
131 } else {
132 MutexLock lock(&mutex_);
133 WakeUpInternal(module);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000134 }
Niels Möller2c16cc62018-10-29 09:47:51 +0100135 wake_up_.Set();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000136}
137
Niels Möller47854022021-03-01 11:31:33 +0100138// Must be called only indirectly from Process, which already holds the lock.
139void ProcessThreadImpl::WakeUpNoLocks(Module* module)
140 RTC_NO_THREAD_SAFETY_ANALYSIS {
141 RTC_DCHECK_RUN_ON(this);
142 WakeUpInternal(module);
143}
144
145void ProcessThreadImpl::WakeUpInternal(Module* module) {
146 for (ModuleCallback& m : modules_) {
147 if (m.module == module)
148 m.next_callback = kCallProcessImmediately;
149 }
150}
151
Danil Chapovalov959e9b62019-01-14 14:29:18 +0100152void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
Niels Möller47854022021-03-01 11:31:33 +0100153 // Allowed to be called on any thread, except from a module's Process method.
154 if (IsCurrent()) {
155 RTC_DCHECK_RUN_ON(this);
156 RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from "
157 "Module::Process is not supported";
158 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000159 {
Niels Möller47854022021-03-01 11:31:33 +0100160 MutexLock lock(&mutex_);
tommi@webrtc.org03054482015-03-05 13:13:42 +0000161 queue_.push(task.release());
162 }
Niels Möller2c16cc62018-10-29 09:47:51 +0100163 wake_up_.Set();
tommi@webrtc.org03054482015-03-05 13:13:42 +0000164}
165
Danil Chapovalov14273de2020-02-27 13:37:43 +0100166void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
167 uint32_t milliseconds) {
168 int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
169 bool recalculate_wakeup_time;
170 {
Niels Möller47854022021-03-01 11:31:33 +0100171 MutexLock lock(&mutex_);
Danil Chapovalov14273de2020-02-27 13:37:43 +0100172 recalculate_wakeup_time =
173 delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
174 delayed_tasks_.emplace(run_at_ms, std::move(task));
175 }
176 if (recalculate_wakeup_time) {
177 wake_up_.Set();
178 }
179}
180
tommidea489f2017-03-03 03:20:24 -0800181void ProcessThreadImpl::RegisterModule(Module* module,
182 const rtc::Location& from) {
Markus Handellfccb0522021-06-02 13:28:10 +0200183 TRACE_EVENT0("webrtc", "ProcessThreadImpl::RegisterModule");
Sebastian Janssonc01367d2019-04-08 15:20:44 +0200184 RTC_DCHECK(thread_checker_.IsCurrent());
tommicf39dd52017-07-07 16:24:34 -0700185 RTC_DCHECK(module) << from.ToString();
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000186
kwiberg5377bc72016-10-04 13:46:56 -0700187#if RTC_DCHECK_IS_ON
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000188 {
189 // Catch programmer error.
Niels Möller47854022021-03-01 11:31:33 +0100190 MutexLock lock(&mutex_);
tommicf39dd52017-07-07 16:24:34 -0700191 for (const ModuleCallback& mc : modules_) {
192 RTC_DCHECK(mc.module != module)
Jonas Olssonb2b20312020-01-14 12:11:31 +0100193 << "Already registered here: " << mc.location.ToString()
194 << "\n"
195 "Now attempting from here: "
196 << from.ToString();
tommicf39dd52017-07-07 16:24:34 -0700197 }
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000198 }
199#endif
200
201 // Now that we know the module isn't in the list, we'll call out to notify
202 // the module that it's attached to the worker thread. We don't hold
203 // the lock while we make this call.
Markus Handellad5037b2021-05-07 15:02:36 +0200204 if (!thread_.empty())
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000205 module->ProcessThreadAttached(this);
206
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000207 {
Niels Möller47854022021-03-01 11:31:33 +0100208 MutexLock lock(&mutex_);
tommidea489f2017-03-03 03:20:24 -0800209 modules_.push_back(ModuleCallback(module, from));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000210 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000211
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000212 // Wake the thread calling ProcessThreadImpl::Process() to update the
213 // waiting time. The waiting time for the just registered module may be
214 // shorter than all other registered modules.
Niels Möller2c16cc62018-10-29 09:47:51 +0100215 wake_up_.Set();
niklase@google.com470e71d2011-07-07 08:21:25 +0000216}
217
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000218void ProcessThreadImpl::DeRegisterModule(Module* module) {
Sebastian Janssonc01367d2019-04-08 15:20:44 +0200219 RTC_DCHECK(thread_checker_.IsCurrent());
henrikg91d6ede2015-09-17 00:24:34 -0700220 RTC_DCHECK(module);
Tommi7f375f02015-04-02 14:50:21 +0000221
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000222 {
Niels Möller47854022021-03-01 11:31:33 +0100223 MutexLock lock(&mutex_);
Yves Gerey665174f2018-06-19 15:03:05 +0200224 modules_.remove_if(
225 [&module](const ModuleCallback& m) { return m.module == module; });
Tommi7f375f02015-04-02 14:50:21 +0000226 }
tommi0a2391f2017-03-21 02:31:51 -0700227
228 // Notify the module that it's been detached.
229 module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000230}
231
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000232bool ProcessThreadImpl::Process() {
tommidea489f2017-03-03 03:20:24 -0800233 TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
Niels Möllerd28db7f2016-05-10 16:31:47 +0200234 int64_t now = rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000235 int64_t next_checkpoint = now + (1000 * 60);
Niels Möller47854022021-03-01 11:31:33 +0100236 RTC_DCHECK_RUN_ON(this);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000237 {
Niels Möller47854022021-03-01 11:31:33 +0100238 MutexLock lock(&mutex_);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000239 if (stop_)
240 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000241 for (ModuleCallback& m : modules_) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000242 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
243 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
244 // operation should not require taking a lock, so querying all modules
245 // should run in a matter of nanoseconds.
246 if (m.next_callback == 0)
247 m.next_callback = GetNextCallbackTime(m.module, now);
niklase@google.com470e71d2011-07-07 08:21:25 +0000248
Niels Möller47854022021-03-01 11:31:33 +0100249 // Set to true for the duration of the calls to modules' Process().
250 holds_mutex_ = true;
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000251 if (m.next_callback <= now ||
252 m.next_callback == kCallProcessImmediately) {
tommidea489f2017-03-03 03:20:24 -0800253 {
254 TRACE_EVENT2("webrtc", "ModuleProcess", "function",
255 m.location.function_name(), "file",
Steve Antonc5d7c522019-12-03 10:14:05 -0800256 m.location.file_name());
tommidea489f2017-03-03 03:20:24 -0800257 m.module->Process();
258 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000259 // Use a new 'now' reference to calculate when the next callback
260 // should occur. We'll continue to use 'now' above for the baseline
261 // of calculating how long we should wait, to reduce variance.
Niels Möllerd28db7f2016-05-10 16:31:47 +0200262 int64_t new_now = rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000263 m.next_callback = GetNextCallbackTime(m.module, new_now);
264 }
Niels Möller47854022021-03-01 11:31:33 +0100265 holds_mutex_ = false;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000266
267 if (m.next_callback < next_checkpoint)
268 next_checkpoint = m.next_callback;
niklase@google.com470e71d2011-07-07 08:21:25 +0000269 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000270
Danil Chapovalov14273de2020-02-27 13:37:43 +0100271 while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
272 queue_.push(delayed_tasks_.top().task);
273 delayed_tasks_.pop();
274 }
275
276 if (!delayed_tasks_.empty()) {
277 next_checkpoint =
278 std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
279 }
280
tommi@webrtc.org03054482015-03-05 13:13:42 +0000281 while (!queue_.empty()) {
Danil Chapovalov959e9b62019-01-14 14:29:18 +0100282 QueuedTask* task = queue_.front();
tommi@webrtc.org03054482015-03-05 13:13:42 +0000283 queue_.pop();
Niels Möller47854022021-03-01 11:31:33 +0100284 mutex_.Unlock();
Danil Chapovalov14273de2020-02-27 13:37:43 +0100285 if (task->Run()) {
286 delete task;
287 }
Niels Möller47854022021-03-01 11:31:33 +0100288 mutex_.Lock();
tommi@webrtc.org03054482015-03-05 13:13:42 +0000289 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000290 }
291
Niels Möllerd28db7f2016-05-10 16:31:47 +0200292 int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000293 if (time_to_wait > 0)
Niels Möller2c16cc62018-10-29 09:47:51 +0100294 wake_up_.Wait(static_cast<int>(time_to_wait));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000295
296 return true;
niklase@google.com470e71d2011-07-07 08:21:25 +0000297}
pbos@webrtc.orgd900e8b2013-07-03 15:12:26 +0000298} // namespace webrtc