parallel_emerge: ensure cleanup runs.

Specifically, lock down the queue shutdown pathways so they're reusable
and executing the same set of steps.

BUG=chromium-os:19208
TEST=manual verification of code paths
TEST=./setup_board --board=x86-alex
TEST=./build_packages --board=x86-alex

Change-Id: Ib6fab0ab6d6cf401f97a7ffc0619959050c1fc0a
Reviewed-on: https://gerrit.chromium.org/gerrit/20056
Tested-by: Brian Harring <ferringb@chromium.org>
Reviewed-by: David James <davidjames@chromium.org>
Commit-Ready: Brian Harring <ferringb@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index b6da1d1..aa6876b 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -1363,30 +1363,42 @@
         self._Print("Retrying emerge of %s." % state.target)
         break
 
-  def _Exit(self):
+  def _Shutdown(self):
     # Tell emerge workers to exit. They all exit when 'None' is pushed
     # to the queue.
-    self._fetch_queue.put(None)
-    self._build_queue.put(None)
 
-    self._fetch_pool.close()
-    self._fetch_pool.join()
+    # Shutdown the workers first; then jobs (which is how they feed things back)
+    # then finally the print queue.
 
-    self._build_pool.close()
-    self._build_pool.join()
+    def _stop(queue, pool):
+      if pool is None:
+        return
+      try:
+        queue.put(None)
+        pool.close()
+        pool.join()
+      finally:
+        pool.terminate()
 
-    self._build_queue.close()
-    self._fetch_queue.close()
+    _stop(self._fetch_queue, self._fetch_pool)
+    self._fetch_queue = self._fetch_pool = None
 
-    self._build_queue = self._fetch_queue = None
+    _stop(self._build_queue, self._build_pool)
+    self._build_queue = self._build_pool = None
+
+    if self._job_queue is not None:
+      self._job_queue.close()
+      self._job_queue = None
 
     # Now that our workers are finished, we can kill the print queue.
-    self._print_queue.put(None)
-    self._print_worker.join()
-    self._print_queue.close()
-    self._print_queue = None
-    self._job_queue.close()
-    self._job_queue = None
+    if self._print_worker is not None:
+      try:
+        self._print_queue.put(None)
+        self._print_queue.close()
+        self._print_worker.join()
+      finally:
+        self._print_worker.terminate()
+    self._print_queue = self._print_worker = None
 
   def Run(self):
     """Run through the scheduled ebuilds.
@@ -1394,6 +1406,9 @@
     Keep running so long as we have uninstalled packages in the
     dependency graph to merge.
     """
+    if not self._deps_map:
+      return
+
     # Start the fetchers.
     for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
       state = self._fetch_ready.get()
@@ -1417,9 +1432,6 @@
         if self._retry_queue:
           self._Retry()
         else:
-          # Tell child threads to exit.
-          self._Exit()
-
           # The dependency map is helpful for debugging failures.
           PrintDepsMap(self._deps_map)
 
@@ -1536,7 +1548,6 @@
 
     # Tell child threads to exit.
     self._Print("Merge complete")
-    self._Exit()
 
 
 def main(argv):
@@ -1602,7 +1613,10 @@
 
   # Run the queued emerges.
   scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
-  scheduler.Run()
+  try:
+    scheduler.Run()
+  finally:
+    scheduler._Shutdown()
   scheduler = None
 
   # If we already upgraded portage, we don't need to do so again. But we do