blob: 947fdb0db6a4cd82c053e46c75f7a97078d41af2 [file] [log] [blame]
niklase@google.com470e71d2011-07-07 08:21:25 +00001/*
xians@webrtc.org6bde7a82012-02-20 08:39:25 +00002 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
niklase@google.com470e71d2011-07-07 08:21:25 +00003 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
pbos@webrtc.org8b062002013-07-12 08:28:10 +000011#include "webrtc/modules/utility/source/process_thread_impl.h"
asapersson@webrtc.org8b2ec152014-04-11 07:59:43 +000012
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000013#include "webrtc/base/checks.h"
14#include "webrtc/modules/interface/module.h"
15#include "webrtc/system_wrappers/interface/logging.h"
16#include "webrtc/system_wrappers/interface/tick_util.h"
niklase@google.com470e71d2011-07-07 08:21:25 +000017
18namespace webrtc {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000019namespace {
tommi@webrtc.org3985f012015-02-27 13:36:34 +000020
21// We use this constant internally to signal that a module has requested
22// a callback right away. When this is set, no call to TimeUntilNextProcess
23// should be made, but Process() should be called directly.
24const int64_t kCallProcessImmediately = -1;
25
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000026int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
27 int64_t interval = module->TimeUntilNextProcess();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000028 if (interval < 0) {
pbos9e260f12015-08-20 01:23:48 -070029 // Falling behind, we should call the callback now.
30 return time_now;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000031 }
32 return time_now + interval;
33}
niklase@google.com470e71d2011-07-07 08:21:25 +000034}
35
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000036ProcessThread::~ProcessThread() {}
niklase@google.com470e71d2011-07-07 08:21:25 +000037
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000038// static
39rtc::scoped_ptr<ProcessThread> ProcessThread::Create() {
40 return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl()).Pass();
niklase@google.com470e71d2011-07-07 08:21:25 +000041}
42
43ProcessThreadImpl::ProcessThreadImpl()
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000044 : wake_up_(EventWrapper::Create()), stop_(false) {
niklase@google.com470e71d2011-07-07 08:21:25 +000045}
46
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000047ProcessThreadImpl::~ProcessThreadImpl() {
48 DCHECK(thread_checker_.CalledOnValidThread());
49 DCHECK(!thread_.get());
50 DCHECK(!stop_);
tommi@webrtc.org03054482015-03-05 13:13:42 +000051
52 while (!queue_.empty()) {
53 delete queue_.front();
54 queue_.pop();
55 }
niklase@google.com470e71d2011-07-07 08:21:25 +000056}
57
tommi@webrtc.org3985f012015-02-27 13:36:34 +000058void ProcessThreadImpl::Start() {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000059 DCHECK(thread_checker_.CalledOnValidThread());
tommi@webrtc.org3985f012015-02-27 13:36:34 +000060 DCHECK(!thread_.get());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000061 if (thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000062 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000063
64 DCHECK(!stop_);
65
Tommi842a4a62015-03-30 14:16:12 +000066 {
67 // TODO(tommi): Since DeRegisterModule is currently being called from
68 // different threads in some cases (ChannelOwner), we need to lock access to
69 // the modules_ collection even on the controller thread.
70 // Once we've cleaned up those places, we can remove this lock.
71 rtc::CritScope lock(&lock_);
72 for (ModuleCallback& m : modules_)
73 m.module->ProcessThreadAttached(this);
74 }
tommi@webrtc.org3985f012015-02-27 13:36:34 +000075
tommi@webrtc.org361981f2015-03-19 14:44:18 +000076 thread_ = ThreadWrapper::CreateThread(
tommi@webrtc.org38492c52015-03-22 14:41:46 +000077 &ProcessThreadImpl::Run, this, "ProcessThread");
pbos@webrtc.org86639732015-03-13 00:06:21 +000078 CHECK(thread_->Start());
niklase@google.com470e71d2011-07-07 08:21:25 +000079}
80
tommi@webrtc.org3985f012015-02-27 13:36:34 +000081void ProcessThreadImpl::Stop() {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000082 DCHECK(thread_checker_.CalledOnValidThread());
83 if(!thread_.get())
tommi@webrtc.org3985f012015-02-27 13:36:34 +000084 return;
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000085
86 {
87 rtc::CritScope lock(&lock_);
88 stop_ = true;
89 }
90
91 wake_up_->Set();
92
tommi@webrtc.org3985f012015-02-27 13:36:34 +000093 CHECK(thread_->Stop());
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +000094 stop_ = false;
95
Tommi842a4a62015-03-30 14:16:12 +000096 // TODO(tommi): Since DeRegisterModule is currently being called from
97 // different threads in some cases (ChannelOwner), we need to lock access to
98 // the modules_ collection even on the controller thread.
Tommi7f375f02015-04-02 14:50:21 +000099 // Since DeRegisterModule also checks thread_, we also need to hold the
100 // lock for the .reset() operation.
Tommi842a4a62015-03-30 14:16:12 +0000101 // Once we've cleaned up those places, we can remove this lock.
102 rtc::CritScope lock(&lock_);
Tommi7f375f02015-04-02 14:50:21 +0000103 thread_.reset();
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000104 for (ModuleCallback& m : modules_)
105 m.module->ProcessThreadAttached(nullptr);
niklase@google.com470e71d2011-07-07 08:21:25 +0000106}
107
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000108void ProcessThreadImpl::WakeUp(Module* module) {
109 // Allowed to be called on any thread.
110 {
111 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000112 for (ModuleCallback& m : modules_) {
113 if (m.module == module)
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000114 m.next_callback = kCallProcessImmediately;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000115 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000116 }
117 wake_up_->Set();
118}
119
tommi@webrtc.org03054482015-03-05 13:13:42 +0000120void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
121 // Allowed to be called on any thread.
122 {
123 rtc::CritScope lock(&lock_);
124 queue_.push(task.release());
125 }
126 wake_up_->Set();
127}
128
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000129void ProcessThreadImpl::RegisterModule(Module* module) {
Tommibc4b9342015-04-02 18:34:37 +0000130 DCHECK(thread_checker_.CalledOnValidThread());
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000131 DCHECK(module);
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000132
133#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
134 {
135 // Catch programmer error.
136 rtc::CritScope lock(&lock_);
137 for (const ModuleCallback& mc : modules_)
138 DCHECK(mc.module != module);
139 }
140#endif
141
142 // Now that we know the module isn't in the list, we'll call out to notify
143 // the module that it's attached to the worker thread. We don't hold
144 // the lock while we make this call.
145 if (thread_.get())
146 module->ProcessThreadAttached(this);
147
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000148 {
149 rtc::CritScope lock(&lock_);
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000150 modules_.push_back(ModuleCallback(module));
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000151 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000152
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000153 // Wake the thread calling ProcessThreadImpl::Process() to update the
154 // waiting time. The waiting time for the just registered module may be
155 // shorter than all other registered modules.
156 wake_up_->Set();
niklase@google.com470e71d2011-07-07 08:21:25 +0000157}
158
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000159void ProcessThreadImpl::DeRegisterModule(Module* module) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000160 // Allowed to be called on any thread.
Tommi7f375f02015-04-02 14:50:21 +0000161 // TODO(tommi): Disallow this ^^^
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000162 DCHECK(module);
Tommi7f375f02015-04-02 14:50:21 +0000163
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000164 {
165 rtc::CritScope lock(&lock_);
166 modules_.remove_if([&module](const ModuleCallback& m) {
167 return m.module == module;
168 });
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000169
Tommi7f375f02015-04-02 14:50:21 +0000170 // TODO(tommi): we currently need to hold the lock while calling out to
171 // ProcessThreadAttached. This is to make sure that the thread hasn't been
172 // destroyed while we attach the module. Once we can make sure
173 // DeRegisterModule isn't being called on arbitrary threads, we can move the
174 // |if (thread_.get())| check and ProcessThreadAttached() call outside the
175 // lock scope.
176
177 // Notify the module that it's been detached.
178 if (thread_.get())
179 module->ProcessThreadAttached(nullptr);
180 }
niklase@google.com470e71d2011-07-07 08:21:25 +0000181}
182
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000183// static
184bool ProcessThreadImpl::Run(void* obj) {
185 return static_cast<ProcessThreadImpl*>(obj)->Process();
niklase@google.com470e71d2011-07-07 08:21:25 +0000186}
187
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000188bool ProcessThreadImpl::Process() {
189 int64_t now = TickTime::MillisecondTimestamp();
190 int64_t next_checkpoint = now + (1000 * 60);
tommi@webrtc.org03054482015-03-05 13:13:42 +0000191
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000192 {
193 rtc::CritScope lock(&lock_);
194 if (stop_)
195 return false;
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000196 for (ModuleCallback& m : modules_) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000197 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
198 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
199 // operation should not require taking a lock, so querying all modules
200 // should run in a matter of nanoseconds.
201 if (m.next_callback == 0)
202 m.next_callback = GetNextCallbackTime(m.module, now);
niklase@google.com470e71d2011-07-07 08:21:25 +0000203
tommi@webrtc.org3985f012015-02-27 13:36:34 +0000204 if (m.next_callback <= now ||
205 m.next_callback == kCallProcessImmediately) {
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000206 m.module->Process();
207 // Use a new 'now' reference to calculate when the next callback
208 // should occur. We'll continue to use 'now' above for the baseline
209 // of calculating how long we should wait, to reduce variance.
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000210 int64_t new_now = TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000211 m.next_callback = GetNextCallbackTime(m.module, new_now);
212 }
213
214 if (m.next_callback < next_checkpoint)
215 next_checkpoint = m.next_callback;
niklase@google.com470e71d2011-07-07 08:21:25 +0000216 }
tommi@webrtc.org03054482015-03-05 13:13:42 +0000217
218 while (!queue_.empty()) {
219 ProcessTask* task = queue_.front();
220 queue_.pop();
221 lock_.Leave();
222 task->Run();
223 delete task;
224 lock_.Enter();
225 }
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000226 }
227
tommi@webrtc.org103f3282015-02-08 00:48:10 +0000228 int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
tommi@webrtc.org0c3e12b2015-02-06 09:44:12 +0000229 if (time_to_wait > 0)
230 wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
231
232 return true;
niklase@google.com470e71d2011-07-07 08:21:25 +0000233}
pbos@webrtc.orgd900e8b2013-07-03 15:12:26 +0000234} // namespace webrtc