blob: 6ebf4a2f07c770191be3ee1673095de28b0f2989 [file] [log] [blame]
niklase@google.com470e71d2011-07-07 08:21:25 +00001/*
xians@webrtc.org6bde7a82012-02-20 08:39:25 +00002 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
niklase@google.com470e71d2011-07-07 08:21:25 +00003 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
pbos@webrtc.org8b062002013-07-12 08:28:10 +000011#include "webrtc/modules/utility/source/process_thread_impl.h"
asapersson@webrtc.org8b2ec152014-04-11 07:59:43 +000012
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000013#include "webrtc/base/checks.h"
14#include "webrtc/modules/interface/module.h"
15#include "webrtc/system_wrappers/interface/logging.h"
16#include "webrtc/system_wrappers/interface/tick_util.h"
niklase@google.com470e71d2011-07-07 08:21:25 +000017
18namespace webrtc {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000019namespace {
tommi@webrtc.org3985f012015-02-27 13:36:34 +000020
21// We use this constant internally to signal that a module has requested
22// a callback right away. When this is set, no call to TimeUntilNextProcess
23// should be made, but Process() should be called directly.
24const int64_t kCallProcessImmediately = -1;
25
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000026int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
27 int64_t interval = module->TimeUntilNextProcess();
28 // Currently some implementations erroneously return error codes from
29 // TimeUntilNextProcess(). So, as is, we correct that and log an error.
30 if (interval < 0) {
31 LOG(LS_ERROR) << "TimeUntilNextProcess returned an invalid value "
32 << interval;
33 interval = 0;
34 }
35 return time_now + interval;
36}
niklase@google.com470e71d2011-07-07 08:21:25 +000037}
38
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000039ProcessThread::~ProcessThread() {}
niklase@google.com470e71d2011-07-07 08:21:25 +000040
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000041// static
42rtc::scoped_ptr<ProcessThread> ProcessThread::Create() {
43 return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl()).Pass();
niklase@google.com470e71d2011-07-07 08:21:25 +000044}
45
46ProcessThreadImpl::ProcessThreadImpl()
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000047 : wake_up_(EventWrapper::Create()), stop_(false) {
niklase@google.com470e71d2011-07-07 08:21:25 +000048}
49
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000050ProcessThreadImpl::~ProcessThreadImpl() {
51 DCHECK(thread_checker_.CalledOnValidThread());
52 DCHECK(!thread_.get());
53 DCHECK(!stop_);
tommi@webrtc.org03054482015-03-05 13:13:42 +000054
55 while (!queue_.empty()) {
56 delete queue_.front();
57 queue_.pop();
58 }
niklase@google.com470e71d2011-07-07 08:21:25 +000059}
60
tommi@webrtc.org3985f012015-02-27 13:36:34 +000061void ProcessThreadImpl::Start() {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000062 DCHECK(thread_checker_.CalledOnValidThread());
tommi@webrtc.org3985f012015-02-27 13:36:34 +000063 DCHECK(!thread_.get());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000064 if (thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000065 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000066
67 DCHECK(!stop_);
68
Tommi842a4a62015-03-30 14:16:12 +000069 {
70 // TODO(tommi): Since DeRegisterModule is currently being called from
71 // different threads in some cases (ChannelOwner), we need to lock access to
72 // the modules_ collection even on the controller thread.
73 // Once we've cleaned up those places, we can remove this lock.
74 rtc::CritScope lock(&lock_);
75 for (ModuleCallback& m : modules_)
76 m.module->ProcessThreadAttached(this);
77 }
tommi@webrtc.org3985f012015-02-27 13:36:34 +000078
tommi@webrtc.org361981f2015-03-19 14:44:18 +000079 thread_ = ThreadWrapper::CreateThread(
tommi@webrtc.org38492c52015-03-22 14:41:46 +000080 &ProcessThreadImpl::Run, this, "ProcessThread");
pbos@webrtc.org86639732015-03-13 00:06:21 +000081 CHECK(thread_->Start());
niklase@google.com470e71d2011-07-07 08:21:25 +000082}
83
tommi@webrtc.org3985f012015-02-27 13:36:34 +000084void ProcessThreadImpl::Stop() {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000085 DCHECK(thread_checker_.CalledOnValidThread());
86 if(!thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000087 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000088
89 {
90 rtc::CritScope lock(&lock_);
91 stop_ = true;
92 }
93
94 wake_up_->Set();
95
tommi@webrtc.org3985f012015-02-27 13:36:34 +000096 CHECK(thread_->Stop());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000097 thread_.reset();
98 stop_ = false;
99
Tommi842a4a62015-03-30 14:16:12 +0000100 // TODO(tommi): Since DeRegisterModule is currently being called from
101 // different threads in some cases (ChannelOwner), we need to lock access to
102 // the modules_ collection even on the controller thread.
103 // Once we've cleaned up those places, we can remove this lock.
104 rtc::CritScope lock(&lock_);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000105 for (ModuleCallback& m : modules_)
106 m.module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000107}
108
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000109void ProcessThreadImpl::WakeUp(Module* module) {
110 // Allowed to be called on any thread.
Tommi842a4a62015-03-30 14:16:12 +0000111 // TODO(tommi): Disallow this ^^^
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000112 {
113 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000114 for (ModuleCallback& m : modules_) {
115 if (m.module == module)
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000116 m.next_callback = kCallProcessImmediately;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000117 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000118 }
119 wake_up_->Set();
120}
121
tommi@webrtc.org03054482015-03-05 13:13:42 +0000122void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
123 // Allowed to be called on any thread.
Tommi842a4a62015-03-30 14:16:12 +0000124 // TODO(tommi): Disallow this ^^^
tommi@webrtc.org03054482015-03-05 13:13:42 +0000125 {
126 rtc::CritScope lock(&lock_);
127 queue_.push(task.release());
128 }
129 wake_up_->Set();
130}
131
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000132void ProcessThreadImpl::RegisterModule(Module* module) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000133 // Allowed to be called on any thread.
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000134 DCHECK(module);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000135
136#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
137 {
138 // Catch programmer error.
139 rtc::CritScope lock(&lock_);
140 for (const ModuleCallback& mc : modules_)
141 DCHECK(mc.module != module);
142 }
143#endif
144
145 // Now that we know the module isn't in the list, we'll call out to notify
146 // the module that it's attached to the worker thread. We don't hold
147 // the lock while we make this call.
148 if (thread_.get())
149 module->ProcessThreadAttached(this);
150
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000151 {
152 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000153 modules_.push_back(ModuleCallback(module));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000154 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000155
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000156 // Wake the thread calling ProcessThreadImpl::Process() to update the
157 // waiting time. The waiting time for the just registered module may be
158 // shorter than all other registered modules.
159 wake_up_->Set();
niklase@google.com470e71d2011-07-07 08:21:25 +0000160}
161
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000162void ProcessThreadImpl::DeRegisterModule(Module* module) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000163 // Allowed to be called on any thread.
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000164 DCHECK(module);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000165 {
166 rtc::CritScope lock(&lock_);
167 modules_.remove_if([&module](const ModuleCallback& m) {
168 return m.module == module;
169 });
170 }
171
172 // Notify the module that it's been detached, while not holding the lock.
173 if (thread_.get())
174 module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000175}
176
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000177// static
178bool ProcessThreadImpl::Run(void* obj) {
179 return static_cast<ProcessThreadImpl*>(obj)->Process();
niklase@google.com470e71d2011-07-07 08:21:25 +0000180}
181
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000182bool ProcessThreadImpl::Process() {
183 int64_t now = TickTime::MillisecondTimestamp();
184 int64_t next_checkpoint = now + (1000 * 60);
tommi@webrtc.org03054482015-03-05 13:13:42 +0000185
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000186 {
187 rtc::CritScope lock(&lock_);
188 if (stop_)
189 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000190 for (ModuleCallback& m : modules_) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000191 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
192 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
193 // operation should not require taking a lock, so querying all modules
194 // should run in a matter of nanoseconds.
195 if (m.next_callback == 0)
196 m.next_callback = GetNextCallbackTime(m.module, now);
niklase@google.com470e71d2011-07-07 08:21:25 +0000197
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000198 if (m.next_callback <= now ||
199 m.next_callback == kCallProcessImmediately) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000200 m.module->Process();
201 // Use a new 'now' reference to calculate when the next callback
202 // should occur. We'll continue to use 'now' above for the baseline
203 // of calculating how long we should wait, to reduce variance.
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000204 int64_t new_now = TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000205 m.next_callback = GetNextCallbackTime(m.module, new_now);
206 }
207
208 if (m.next_callback < next_checkpoint)
209 next_checkpoint = m.next_callback;
niklase@google.com470e71d2011-07-07 08:21:25 +0000210 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000211
212 while (!queue_.empty()) {
213 ProcessTask* task = queue_.front();
214 queue_.pop();
215 lock_.Leave();
216 task->Run();
217 delete task;
218 lock_.Enter();
219 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000220 }
221
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000222 int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000223 if (time_to_wait > 0)
224 wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
225
226 return true;
niklase@google.com470e71d2011-07-07 08:21:25 +0000227}
pbos@webrtc.orgd900e8b2013-07-03 15:12:26 +0000228} // namespace webrtc