ProcessThread improvements.
* Added a way to notify a Module that it's been attached to a ProcessThread.
The benefit of this is to give the module a way to wake up the thread
when it needs work to happen on the worker thread, immediately.
Today, module instances are typically registered with a process thread
outside the control of the modules themselves. I.e. they typically
don't know about the process thread they're attached to.
* Improve ProcessThread's WakeUp algorithm to not call TimeUntilNextProcess
when a WakeUp call is requested. This is an optimization for the above
case which avoids the module having to acquire a lock or do an interlocked
operation before calling WakeUp(), which would ensure the module's
TimeUntilNextProcess() implementation would return 0.
BUG=2822
R=stefan@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/39239004
Cr-Commit-Position: refs/heads/master@{#8527}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8527 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc
index 1e498d4..c408b2c 100644
--- a/webrtc/modules/utility/source/process_thread_impl.cc
+++ b/webrtc/modules/utility/source/process_thread_impl.cc
@@ -17,6 +17,12 @@
namespace webrtc {
namespace {
+
+// We use this constant internally to signal that a module has requested
+// a callback right away. When this is set, no call to TimeUntilNextProcess
+// should be made, but Process() should be called directly.
+const int64_t kCallProcessImmediately = -1;
+
int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
int64_t interval = module->TimeUntilNextProcess();
// Currently some implementations erroneously return error codes from
@@ -47,28 +53,27 @@
DCHECK(!stop_);
}
-int32_t ProcessThreadImpl::Start() {
+void ProcessThreadImpl::Start() {
DCHECK(thread_checker_.CalledOnValidThread());
+ DCHECK(!thread_.get());
if (thread_.get())
- return -1;
+ return;
DCHECK(!stop_);
+ for (ModuleCallback& m : modules_)
+ m.module->ProcessThreadAttached(this);
+
thread_.reset(ThreadWrapper::CreateThread(
&ProcessThreadImpl::Run, this, kNormalPriority, "ProcessThread"));
unsigned int id;
- if (!thread_->Start(id)) {
- thread_.reset();
- return -1;
- }
-
- return 0;
+ CHECK(thread_->Start(id));
}
-int32_t ProcessThreadImpl::Stop() {
+void ProcessThreadImpl::Stop() {
DCHECK(thread_checker_.CalledOnValidThread());
if(!thread_.get())
- return 0;
+ return;
{
rtc::CritScope lock(&lock_);
@@ -77,11 +82,12 @@
wake_up_->Set();
- thread_->Stop();
+ CHECK(thread_->Stop());
thread_.reset();
stop_ = false;
- return 0;
+ for (ModuleCallback& m : modules_)
+ m.module->ProcessThreadAttached(nullptr);
}
void ProcessThreadImpl::WakeUp(Module* module) {
@@ -90,23 +96,33 @@
rtc::CritScope lock(&lock_);
for (ModuleCallback& m : modules_) {
if (m.module == module)
- m.next_callback = 0;
+ m.next_callback = kCallProcessImmediately;
}
}
wake_up_->Set();
}
-int32_t ProcessThreadImpl::RegisterModule(Module* module) {
+void ProcessThreadImpl::RegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
+
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+ {
+ // Catch programmer error.
+ rtc::CritScope lock(&lock_);
+ for (const ModuleCallback& mc : modules_)
+ DCHECK(mc.module != module);
+ }
+#endif
+
+ // Now that we know the module isn't in the list, we'll call out to notify
+ // the module that it's attached to the worker thread. We don't hold
+ // the lock while we make this call.
+ if (thread_.get())
+ module->ProcessThreadAttached(this);
+
{
rtc::CritScope lock(&lock_);
- // Only allow module to be registered once.
- for (const ModuleCallback& mc : modules_) {
- if (mc.module == module)
- return -1;
- }
-
modules_.push_back(ModuleCallback(module));
}
@@ -114,18 +130,21 @@
// waiting time. The waiting time for the just registered module may be
// shorter than all other registered modules.
wake_up_->Set();
-
- return 0;
}
-int32_t ProcessThreadImpl::DeRegisterModule(const Module* module) {
+void ProcessThreadImpl::DeRegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
- rtc::CritScope lock(&lock_);
- modules_.remove_if([&module](const ModuleCallback& m) {
- return m.module == module;
- });
- return 0;
+ {
+ rtc::CritScope lock(&lock_);
+ modules_.remove_if([&module](const ModuleCallback& m) {
+ return m.module == module;
+ });
+ }
+
+ // Notify the module that it's been detached, while not holding the lock.
+ if (thread_.get())
+ module->ProcessThreadAttached(nullptr);
}
// static
@@ -148,7 +167,8 @@
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);
- if (m.next_callback <= now) {
+ if (m.next_callback <= now ||
+ m.next_callback == kCallProcessImmediately) {
m.module->Process();
// Use a new 'now' reference to calculate when the next callback
// should occur. We'll continue to use 'now' above for the baseline