blob: 73338ac4fa0b2abd8f5da22491163b8dd2277163 [file] [log] [blame]
niklase@google.com470e71d2011-07-07 08:21:25 +00001/*
xians@webrtc.org6bde7a82012-02-20 08:39:25 +00002 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
niklase@google.com470e71d2011-07-07 08:21:25 +00003 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020011#include "modules/utility/source/process_thread_impl.h"
asapersson@webrtc.org8b2ec152014-04-11 07:59:43 +000012
Yves Gerey988cc082018-10-23 12:03:01 +020013#include <string>
14
Mirko Bonadei92ea95e2017-09-15 06:47:31 +020015#include "modules/include/module.h"
16#include "rtc_base/checks.h"
17#include "rtc_base/task_queue.h"
18#include "rtc_base/timeutils.h"
19#include "rtc_base/trace_event.h"
niklase@google.com470e71d2011-07-07 08:21:25 +000020
21namespace webrtc {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000022namespace {
tommi@webrtc.org3985f012015-02-27 13:36:34 +000023
24// We use this constant internally to signal that a module has requested
25// a callback right away. When this is set, no call to TimeUntilNextProcess
26// should be made, but Process() should be called directly.
27const int64_t kCallProcessImmediately = -1;
28
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000029int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
30 int64_t interval = module->TimeUntilNextProcess();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000031 if (interval < 0) {
pbos9e260f12015-08-20 01:23:48 -070032 // Falling behind, we should call the callback now.
33 return time_now;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000034 }
35 return time_now + interval;
36}
Yves Gerey665174f2018-06-19 15:03:05 +020037} // namespace
niklase@google.com470e71d2011-07-07 08:21:25 +000038
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000039ProcessThread::~ProcessThread() {}
niklase@google.com470e71d2011-07-07 08:21:25 +000040
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000041// static
Yves Gerey665174f2018-06-19 15:03:05 +020042std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
kwiberg84be5112016-04-27 01:19:58 -070043 return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
niklase@google.com470e71d2011-07-07 08:21:25 +000044}
45
stefan847855b2015-09-11 09:52:15 -070046ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
Niels Möller2c16cc62018-10-29 09:47:51 +010047 : wake_up_(/*manual_reset=*/false, /*initially_signaled=*/false),
stefan847855b2015-09-11 09:52:15 -070048 stop_(false),
49 thread_name_(thread_name) {}
niklase@google.com470e71d2011-07-07 08:21:25 +000050
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000051ProcessThreadImpl::~ProcessThreadImpl() {
henrikg91d6ede2015-09-17 00:24:34 -070052 RTC_DCHECK(thread_checker_.CalledOnValidThread());
53 RTC_DCHECK(!thread_.get());
54 RTC_DCHECK(!stop_);
tommi@webrtc.org03054482015-03-05 13:13:42 +000055
56 while (!queue_.empty()) {
57 delete queue_.front();
58 queue_.pop();
59 }
niklase@google.com470e71d2011-07-07 08:21:25 +000060}
61
tommi@webrtc.org3985f012015-02-27 13:36:34 +000062void ProcessThreadImpl::Start() {
henrikg91d6ede2015-09-17 00:24:34 -070063 RTC_DCHECK(thread_checker_.CalledOnValidThread());
64 RTC_DCHECK(!thread_.get());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000065 if (thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000066 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000067
henrikg91d6ede2015-09-17 00:24:34 -070068 RTC_DCHECK(!stop_);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000069
tommi0a2391f2017-03-21 02:31:51 -070070 for (ModuleCallback& m : modules_)
71 m.module->ProcessThreadAttached(this);
tommi@webrtc.org3985f012015-02-27 13:36:34 +000072
Peter Boström8c38e8b2015-11-26 17:45:47 +010073 thread_.reset(
74 new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
75 thread_->Start();
niklase@google.com470e71d2011-07-07 08:21:25 +000076}
77
tommi@webrtc.org3985f012015-02-27 13:36:34 +000078void ProcessThreadImpl::Stop() {
henrikg91d6ede2015-09-17 00:24:34 -070079 RTC_DCHECK(thread_checker_.CalledOnValidThread());
Yves Gerey665174f2018-06-19 15:03:05 +020080 if (!thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000081 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000082
83 {
84 rtc::CritScope lock(&lock_);
85 stop_ = true;
86 }
87
Niels Möller2c16cc62018-10-29 09:47:51 +010088 wake_up_.Set();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000089
Peter Boström8c38e8b2015-11-26 17:45:47 +010090 thread_->Stop();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000091 stop_ = false;
92
Tommi7f375f02015-04-02 14:50:21 +000093 thread_.reset();
tommi@webrtc.org3985f012015-02-27 13:36:34 +000094 for (ModuleCallback& m : modules_)
95 m.module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +000096}
97
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000098void ProcessThreadImpl::WakeUp(Module* module) {
99 // Allowed to be called on any thread.
100 {
101 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000102 for (ModuleCallback& m : modules_) {
103 if (m.module == module)
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000104 m.next_callback = kCallProcessImmediately;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000105 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000106 }
Niels Möller2c16cc62018-10-29 09:47:51 +0100107 wake_up_.Set();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000108}
109
tommi435f98b2016-05-28 14:57:15 -0700110void ProcessThreadImpl::PostTask(std::unique_ptr<rtc::QueuedTask> task) {
tommi@webrtc.org03054482015-03-05 13:13:42 +0000111 // Allowed to be called on any thread.
112 {
113 rtc::CritScope lock(&lock_);
114 queue_.push(task.release());
115 }
Niels Möller2c16cc62018-10-29 09:47:51 +0100116 wake_up_.Set();
tommi@webrtc.org03054482015-03-05 13:13:42 +0000117}
118
tommidea489f2017-03-03 03:20:24 -0800119void ProcessThreadImpl::RegisterModule(Module* module,
120 const rtc::Location& from) {
henrikg91d6ede2015-09-17 00:24:34 -0700121 RTC_DCHECK(thread_checker_.CalledOnValidThread());
tommicf39dd52017-07-07 16:24:34 -0700122 RTC_DCHECK(module) << from.ToString();
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000123
kwiberg5377bc72016-10-04 13:46:56 -0700124#if RTC_DCHECK_IS_ON
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000125 {
126 // Catch programmer error.
127 rtc::CritScope lock(&lock_);
tommicf39dd52017-07-07 16:24:34 -0700128 for (const ModuleCallback& mc : modules_) {
129 RTC_DCHECK(mc.module != module)
130 << "Already registered here: " << mc.location.ToString() << "\n"
131 << "Now attempting from here: " << from.ToString();
132 }
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000133 }
134#endif
135
136 // Now that we know the module isn't in the list, we'll call out to notify
137 // the module that it's attached to the worker thread. We don't hold
138 // the lock while we make this call.
139 if (thread_.get())
140 module->ProcessThreadAttached(this);
141
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000142 {
143 rtc::CritScope lock(&lock_);
tommidea489f2017-03-03 03:20:24 -0800144 modules_.push_back(ModuleCallback(module, from));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000145 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000146
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000147 // Wake the thread calling ProcessThreadImpl::Process() to update the
148 // waiting time. The waiting time for the just registered module may be
149 // shorter than all other registered modules.
Niels Möller2c16cc62018-10-29 09:47:51 +0100150 wake_up_.Set();
niklase@google.com470e71d2011-07-07 08:21:25 +0000151}
152
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000153void ProcessThreadImpl::DeRegisterModule(Module* module) {
tommi0a2391f2017-03-21 02:31:51 -0700154 RTC_DCHECK(thread_checker_.CalledOnValidThread());
henrikg91d6ede2015-09-17 00:24:34 -0700155 RTC_DCHECK(module);
Tommi7f375f02015-04-02 14:50:21 +0000156
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000157 {
158 rtc::CritScope lock(&lock_);
Yves Gerey665174f2018-06-19 15:03:05 +0200159 modules_.remove_if(
160 [&module](const ModuleCallback& m) { return m.module == module; });
Tommi7f375f02015-04-02 14:50:21 +0000161 }
tommi0a2391f2017-03-21 02:31:51 -0700162
163 // Notify the module that it's been detached.
164 module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000165}
166
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000167// static
168bool ProcessThreadImpl::Run(void* obj) {
169 return static_cast<ProcessThreadImpl*>(obj)->Process();
niklase@google.com470e71d2011-07-07 08:21:25 +0000170}
171
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000172bool ProcessThreadImpl::Process() {
tommidea489f2017-03-03 03:20:24 -0800173 TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
Niels Möllerd28db7f2016-05-10 16:31:47 +0200174 int64_t now = rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000175 int64_t next_checkpoint = now + (1000 * 60);
tommi@webrtc.org03054482015-03-05 13:13:42 +0000176
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000177 {
178 rtc::CritScope lock(&lock_);
179 if (stop_)
180 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000181 for (ModuleCallback& m : modules_) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000182 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
183 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
184 // operation should not require taking a lock, so querying all modules
185 // should run in a matter of nanoseconds.
186 if (m.next_callback == 0)
187 m.next_callback = GetNextCallbackTime(m.module, now);
niklase@google.com470e71d2011-07-07 08:21:25 +0000188
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000189 if (m.next_callback <= now ||
190 m.next_callback == kCallProcessImmediately) {
tommidea489f2017-03-03 03:20:24 -0800191 {
192 TRACE_EVENT2("webrtc", "ModuleProcess", "function",
193 m.location.function_name(), "file",
194 m.location.file_and_line());
195 m.module->Process();
196 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000197 // Use a new 'now' reference to calculate when the next callback
198 // should occur. We'll continue to use 'now' above for the baseline
199 // of calculating how long we should wait, to reduce variance.
Niels Möllerd28db7f2016-05-10 16:31:47 +0200200 int64_t new_now = rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000201 m.next_callback = GetNextCallbackTime(m.module, new_now);
202 }
203
204 if (m.next_callback < next_checkpoint)
205 next_checkpoint = m.next_callback;
niklase@google.com470e71d2011-07-07 08:21:25 +0000206 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000207
208 while (!queue_.empty()) {
tommi435f98b2016-05-28 14:57:15 -0700209 rtc::QueuedTask* task = queue_.front();
tommi@webrtc.org03054482015-03-05 13:13:42 +0000210 queue_.pop();
211 lock_.Leave();
212 task->Run();
213 delete task;
214 lock_.Enter();
215 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000216 }
217
Niels Möllerd28db7f2016-05-10 16:31:47 +0200218 int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000219 if (time_to_wait > 0)
Niels Möller2c16cc62018-10-29 09:47:51 +0100220 wake_up_.Wait(static_cast<int>(time_to_wait));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000221
222 return true;
niklase@google.com470e71d2011-07-07 08:21:25 +0000223}
pbos@webrtc.orgd900e8b2013-07-03 15:12:26 +0000224} // namespace webrtc