blob: 68c7ab676d6a7fadade0cc5c3ff115b7a57d55e7 [file] [log] [blame]
niklase@google.com470e71d2011-07-07 08:21:25 +00001/*
xians@webrtc.org6bde7a82012-02-20 08:39:25 +00002 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
niklase@google.com470e71d2011-07-07 08:21:25 +00003 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
pbos@webrtc.org8b062002013-07-12 08:28:10 +000011#include "webrtc/modules/utility/source/process_thread_impl.h"
asapersson@webrtc.org8b2ec152014-04-11 07:59:43 +000012
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000013#include "webrtc/base/checks.h"
Henrik Kjellanderff761fb2015-11-04 08:31:52 +010014#include "webrtc/modules/include/module.h"
Henrik Kjellander98f53512015-10-28 18:17:40 +010015#include "webrtc/system_wrappers/include/logging.h"
16#include "webrtc/system_wrappers/include/tick_util.h"
niklase@google.com470e71d2011-07-07 08:21:25 +000017
18namespace webrtc {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000019namespace {
tommi@webrtc.org3985f012015-02-27 13:36:34 +000020
21// We use this constant internally to signal that a module has requested
22// a callback right away. When this is set, no call to TimeUntilNextProcess
23// should be made, but Process() should be called directly.
24const int64_t kCallProcessImmediately = -1;
25
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000026int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
27 int64_t interval = module->TimeUntilNextProcess();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000028 if (interval < 0) {
pbos9e260f12015-08-20 01:23:48 -070029 // Falling behind, we should call the callback now.
30 return time_now;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000031 }
32 return time_now + interval;
33}
niklase@google.com470e71d2011-07-07 08:21:25 +000034}
35
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000036ProcessThread::~ProcessThread() {}
niklase@google.com470e71d2011-07-07 08:21:25 +000037
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000038// static
kwiberg84be5112016-04-27 01:19:58 -070039std::unique_ptr<ProcessThread> ProcessThread::Create(
stefan847855b2015-09-11 09:52:15 -070040 const char* thread_name) {
kwiberg84be5112016-04-27 01:19:58 -070041 return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
niklase@google.com470e71d2011-07-07 08:21:25 +000042}
43
stefan847855b2015-09-11 09:52:15 -070044ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
45 : wake_up_(EventWrapper::Create()),
46 stop_(false),
47 thread_name_(thread_name) {}
niklase@google.com470e71d2011-07-07 08:21:25 +000048
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000049ProcessThreadImpl::~ProcessThreadImpl() {
henrikg91d6ede2015-09-17 00:24:34 -070050 RTC_DCHECK(thread_checker_.CalledOnValidThread());
51 RTC_DCHECK(!thread_.get());
52 RTC_DCHECK(!stop_);
tommi@webrtc.org03054482015-03-05 13:13:42 +000053
54 while (!queue_.empty()) {
55 delete queue_.front();
56 queue_.pop();
57 }
niklase@google.com470e71d2011-07-07 08:21:25 +000058}
59
tommi@webrtc.org3985f012015-02-27 13:36:34 +000060void ProcessThreadImpl::Start() {
henrikg91d6ede2015-09-17 00:24:34 -070061 RTC_DCHECK(thread_checker_.CalledOnValidThread());
62 RTC_DCHECK(!thread_.get());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000063 if (thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000064 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000065
henrikg91d6ede2015-09-17 00:24:34 -070066 RTC_DCHECK(!stop_);
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000067
Tommi842a4a62015-03-30 14:16:12 +000068 {
69 // TODO(tommi): Since DeRegisterModule is currently being called from
70 // different threads in some cases (ChannelOwner), we need to lock access to
71 // the modules_ collection even on the controller thread.
72 // Once we've cleaned up those places, we can remove this lock.
73 rtc::CritScope lock(&lock_);
74 for (ModuleCallback& m : modules_)
75 m.module->ProcessThreadAttached(this);
76 }
tommi@webrtc.org3985f012015-02-27 13:36:34 +000077
Peter Boström8c38e8b2015-11-26 17:45:47 +010078 thread_.reset(
79 new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
80 thread_->Start();
niklase@google.com470e71d2011-07-07 08:21:25 +000081}
82
tommi@webrtc.org3985f012015-02-27 13:36:34 +000083void ProcessThreadImpl::Stop() {
henrikg91d6ede2015-09-17 00:24:34 -070084 RTC_DCHECK(thread_checker_.CalledOnValidThread());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000085 if(!thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000086 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000087
88 {
89 rtc::CritScope lock(&lock_);
90 stop_ = true;
91 }
92
93 wake_up_->Set();
94
Peter Boström8c38e8b2015-11-26 17:45:47 +010095 thread_->Stop();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000096 stop_ = false;
97
Tommi842a4a62015-03-30 14:16:12 +000098 // TODO(tommi): Since DeRegisterModule is currently being called from
99 // different threads in some cases (ChannelOwner), we need to lock access to
100 // the modules_ collection even on the controller thread.
Tommi7f375f02015-04-02 14:50:21 +0000101 // Since DeRegisterModule also checks thread_, we also need to hold the
102 // lock for the .reset() operation.
Tommi842a4a62015-03-30 14:16:12 +0000103 // Once we've cleaned up those places, we can remove this lock.
104 rtc::CritScope lock(&lock_);
Tommi7f375f02015-04-02 14:50:21 +0000105 thread_.reset();
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000106 for (ModuleCallback& m : modules_)
107 m.module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000108}
109
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000110void ProcessThreadImpl::WakeUp(Module* module) {
111 // Allowed to be called on any thread.
112 {
113 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000114 for (ModuleCallback& m : modules_) {
115 if (m.module == module)
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000116 m.next_callback = kCallProcessImmediately;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000117 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000118 }
119 wake_up_->Set();
120}
121
kwiberg84be5112016-04-27 01:19:58 -0700122void ProcessThreadImpl::PostTask(std::unique_ptr<ProcessTask> task) {
tommi@webrtc.org03054482015-03-05 13:13:42 +0000123 // Allowed to be called on any thread.
124 {
125 rtc::CritScope lock(&lock_);
126 queue_.push(task.release());
127 }
128 wake_up_->Set();
129}
130
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000131void ProcessThreadImpl::RegisterModule(Module* module) {
henrikg91d6ede2015-09-17 00:24:34 -0700132 RTC_DCHECK(thread_checker_.CalledOnValidThread());
133 RTC_DCHECK(module);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000134
135#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
136 {
137 // Catch programmer error.
138 rtc::CritScope lock(&lock_);
139 for (const ModuleCallback& mc : modules_)
henrikg91d6ede2015-09-17 00:24:34 -0700140 RTC_DCHECK(mc.module != module);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000141 }
142#endif
143
144 // Now that we know the module isn't in the list, we'll call out to notify
145 // the module that it's attached to the worker thread. We don't hold
146 // the lock while we make this call.
147 if (thread_.get())
148 module->ProcessThreadAttached(this);
149
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000150 {
151 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000152 modules_.push_back(ModuleCallback(module));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000153 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000154
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000155 // Wake the thread calling ProcessThreadImpl::Process() to update the
156 // waiting time. The waiting time for the just registered module may be
157 // shorter than all other registered modules.
158 wake_up_->Set();
niklase@google.com470e71d2011-07-07 08:21:25 +0000159}
160
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000161void ProcessThreadImpl::DeRegisterModule(Module* module) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000162 // Allowed to be called on any thread.
Tommi7f375f02015-04-02 14:50:21 +0000163 // TODO(tommi): Disallow this ^^^
henrikg91d6ede2015-09-17 00:24:34 -0700164 RTC_DCHECK(module);
Tommi7f375f02015-04-02 14:50:21 +0000165
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000166 {
167 rtc::CritScope lock(&lock_);
168 modules_.remove_if([&module](const ModuleCallback& m) {
169 return m.module == module;
170 });
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000171
Tommi7f375f02015-04-02 14:50:21 +0000172 // TODO(tommi): we currently need to hold the lock while calling out to
173 // ProcessThreadAttached. This is to make sure that the thread hasn't been
174 // destroyed while we attach the module. Once we can make sure
175 // DeRegisterModule isn't being called on arbitrary threads, we can move the
176 // |if (thread_.get())| check and ProcessThreadAttached() call outside the
177 // lock scope.
178
179 // Notify the module that it's been detached.
180 if (thread_.get())
181 module->ProcessThreadAttached(nullptr);
182 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000183}
184
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000185// static
186bool ProcessThreadImpl::Run(void* obj) {
187 return static_cast<ProcessThreadImpl*>(obj)->Process();
niklase@google.com470e71d2011-07-07 08:21:25 +0000188}
189
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000190bool ProcessThreadImpl::Process() {
191 int64_t now = TickTime::MillisecondTimestamp();
192 int64_t next_checkpoint = now + (1000 * 60);
tommi@webrtc.org03054482015-03-05 13:13:42 +0000193
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000194 {
195 rtc::CritScope lock(&lock_);
196 if (stop_)
197 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000198 for (ModuleCallback& m : modules_) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000199 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
200 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
201 // operation should not require taking a lock, so querying all modules
202 // should run in a matter of nanoseconds.
203 if (m.next_callback == 0)
204 m.next_callback = GetNextCallbackTime(m.module, now);
niklase@google.com470e71d2011-07-07 08:21:25 +0000205
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000206 if (m.next_callback <= now ||
207 m.next_callback == kCallProcessImmediately) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000208 m.module->Process();
209 // Use a new 'now' reference to calculate when the next callback
210 // should occur. We'll continue to use 'now' above for the baseline
211 // of calculating how long we should wait, to reduce variance.
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000212 int64_t new_now = TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000213 m.next_callback = GetNextCallbackTime(m.module, new_now);
214 }
215
216 if (m.next_callback < next_checkpoint)
217 next_checkpoint = m.next_callback;
niklase@google.com470e71d2011-07-07 08:21:25 +0000218 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000219
220 while (!queue_.empty()) {
221 ProcessTask* task = queue_.front();
222 queue_.pop();
223 lock_.Leave();
224 task->Run();
225 delete task;
226 lock_.Enter();
227 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000228 }
229
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000230 int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000231 if (time_to_wait > 0)
232 wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
233
234 return true;
niklase@google.com470e71d2011-07-07 08:21:25 +0000235}
pbos@webrtc.orgd900e8b2013-07-03 15:12:26 +0000236} // namespace webrtc