ProcessThread improvements.

* Added a way to notify a Module that it's been attached to a ProcessThread.
  The benefit of this is to give the module a way to wake up the thread
  when it needs work to happen on the worker thread, immediately.
  Today, module instances are typically registered with a process thread
  outside the control of the modules themselves.  I.e. they typically
  don't know about the process thread they're attached to.

* Improve ProcessThread's WakeUp algorithm to not call TimeUntilNextProcess
  when a WakeUp call is requested.  This is an optimization for the above
  case which avoids the module having to acquire a lock or do an interlocked
  operation before calling WakeUp(), which would ensure the module's
  TimeUntilNextProcess() implementation would return 0.

BUG=2822
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/39239004

Cr-Commit-Position: refs/heads/master@{#8527}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8527 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc
index 1e498d4..c408b2c 100644
--- a/webrtc/modules/utility/source/process_thread_impl.cc
+++ b/webrtc/modules/utility/source/process_thread_impl.cc
@@ -17,6 +17,12 @@
 
 namespace webrtc {
 namespace {
+
+// We use this constant internally to signal that a module has requested
+// a callback right away.  When this is set, no call to TimeUntilNextProcess
+// should be made, but Process() should be called directly.
+const int64_t kCallProcessImmediately = -1;
+
 int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
   int64_t interval = module->TimeUntilNextProcess();
   // Currently some implementations erroneously return error codes from
@@ -47,28 +53,27 @@
   DCHECK(!stop_);
 }
 
-int32_t ProcessThreadImpl::Start() {
+void ProcessThreadImpl::Start() {
   DCHECK(thread_checker_.CalledOnValidThread());
+  DCHECK(!thread_.get());
   if (thread_.get())
-    return -1;
+    return;
 
   DCHECK(!stop_);
 
+  for (ModuleCallback& m : modules_)
+    m.module->ProcessThreadAttached(this);
+
   thread_.reset(ThreadWrapper::CreateThread(
       &ProcessThreadImpl::Run, this, kNormalPriority, "ProcessThread"));
   unsigned int id;
-  if (!thread_->Start(id)) {
-    thread_.reset();
-    return -1;
-  }
-
-  return 0;
+  CHECK(thread_->Start(id));
 }
 
-int32_t ProcessThreadImpl::Stop() {
+void ProcessThreadImpl::Stop() {
   DCHECK(thread_checker_.CalledOnValidThread());
   if(!thread_.get())
-    return 0;
+    return;
 
   {
     rtc::CritScope lock(&lock_);
@@ -77,11 +82,12 @@
 
   wake_up_->Set();
 
-  thread_->Stop();
+  CHECK(thread_->Stop());
   thread_.reset();
   stop_ = false;
 
-  return 0;
+  for (ModuleCallback& m : modules_)
+    m.module->ProcessThreadAttached(nullptr);
 }
 
 void ProcessThreadImpl::WakeUp(Module* module) {
@@ -90,23 +96,33 @@
     rtc::CritScope lock(&lock_);
     for (ModuleCallback& m : modules_) {
       if (m.module == module)
-        m.next_callback = 0;
+        m.next_callback = kCallProcessImmediately;
     }
   }
   wake_up_->Set();
 }
 
-int32_t ProcessThreadImpl::RegisterModule(Module* module) {
+void ProcessThreadImpl::RegisterModule(Module* module) {
   // Allowed to be called on any thread.
   DCHECK(module);
+
+#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
+  {
+    // Catch programmer error.
+    rtc::CritScope lock(&lock_);
+    for (const ModuleCallback& mc : modules_)
+      DCHECK(mc.module != module);
+  }
+#endif
+
+  // Now that we know the module isn't in the list, we'll call out to notify
+  // the module that it's attached to the worker thread.  We don't hold
+  // the lock while we make this call.
+  if (thread_.get())
+    module->ProcessThreadAttached(this);
+
   {
     rtc::CritScope lock(&lock_);
-    // Only allow module to be registered once.
-    for (const ModuleCallback& mc : modules_) {
-      if (mc.module == module)
-        return -1;
-    }
-
     modules_.push_back(ModuleCallback(module));
   }
 
@@ -114,18 +130,21 @@
   // waiting time. The waiting time for the just registered module may be
   // shorter than all other registered modules.
   wake_up_->Set();
-
-  return 0;
 }
 
-int32_t ProcessThreadImpl::DeRegisterModule(const Module* module) {
+void ProcessThreadImpl::DeRegisterModule(Module* module) {
   // Allowed to be called on any thread.
   DCHECK(module);
-  rtc::CritScope lock(&lock_);
-  modules_.remove_if([&module](const ModuleCallback& m) {
-      return m.module == module;
-    });
-  return 0;
+  {
+    rtc::CritScope lock(&lock_);
+    modules_.remove_if([&module](const ModuleCallback& m) {
+        return m.module == module;
+      });
+  }
+
+  // Notify the module that it's been detached, while not holding the lock.
+  if (thread_.get())
+    module->ProcessThreadAttached(nullptr);
 }
 
 // static
@@ -148,7 +167,8 @@
       if (m.next_callback == 0)
         m.next_callback = GetNextCallbackTime(m.module, now);
 
-      if (m.next_callback <= now) {
+      if (m.next_callback <= now ||
+          m.next_callback == kCallProcessImmediately) {
         m.module->Process();
         // Use a new 'now' reference to calculate when the next callback
         // should occur.  We'll continue to use 'now' above for the baseline