parallel_emerge: ensure cleanup runs.
Specifically, lock down the queue shutdown pathways so they're reusable
and executing the same set of steps.
BUG=chromium-os:19208
TEST=manual verification of code paths
TEST=./setup_board --board=x86-alex
TEST=./build_packages --board=x86-alex
Change-Id: Ib6fab0ab6d6cf401f97a7ffc0619959050c1fc0a
Reviewed-on: https://gerrit.chromium.org/gerrit/20056
Tested-by: Brian Harring <ferringb@chromium.org>
Reviewed-by: David James <davidjames@chromium.org>
Commit-Ready: Brian Harring <ferringb@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index b6da1d1..aa6876b 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -1363,30 +1363,42 @@
self._Print("Retrying emerge of %s." % state.target)
break
- def _Exit(self):
+ def _Shutdown(self):
# Tell emerge workers to exit. They all exit when 'None' is pushed
# to the queue.
- self._fetch_queue.put(None)
- self._build_queue.put(None)
- self._fetch_pool.close()
- self._fetch_pool.join()
+ # Shutdown the workers first; then jobs (which is how they feed things back)
+ # then finally the print queue.
- self._build_pool.close()
- self._build_pool.join()
+ def _stop(queue, pool):
+ if pool is None:
+ return
+ try:
+ queue.put(None)
+ pool.close()
+ pool.join()
+ finally:
+ pool.terminate()
- self._build_queue.close()
- self._fetch_queue.close()
+ _stop(self._fetch_queue, self._fetch_pool)
+ self._fetch_queue = self._fetch_pool = None
- self._build_queue = self._fetch_queue = None
+ _stop(self._build_queue, self._build_pool)
+ self._build_queue = self._build_pool = None
+
+ if self._job_queue is not None:
+ self._job_queue.close()
+ self._job_queue = None
# Now that our workers are finished, we can kill the print queue.
- self._print_queue.put(None)
- self._print_worker.join()
- self._print_queue.close()
- self._print_queue = None
- self._job_queue.close()
- self._job_queue = None
+ if self._print_worker is not None:
+ try:
+ self._print_queue.put(None)
+ self._print_queue.close()
+ self._print_worker.join()
+ finally:
+ self._print_worker.terminate()
+ self._print_queue = self._print_worker = None
def Run(self):
"""Run through the scheduled ebuilds.
@@ -1394,6 +1406,9 @@
Keep running so long as we have uninstalled packages in the
dependency graph to merge.
"""
+ if not self._deps_map:
+ return
+
# Start the fetchers.
for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
state = self._fetch_ready.get()
@@ -1417,9 +1432,6 @@
if self._retry_queue:
self._Retry()
else:
- # Tell child threads to exit.
- self._Exit()
-
# The dependency map is helpful for debugging failures.
PrintDepsMap(self._deps_map)
@@ -1536,7 +1548,6 @@
# Tell child threads to exit.
self._Print("Merge complete")
- self._Exit()
def main(argv):
@@ -1602,7 +1613,10 @@
# Run the queued emerges.
scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
- scheduler.Run()
+ try:
+ scheduler.Run()
+ finally:
+ scheduler._Shutdown()
scheduler = None
# If we already upgraded portage, we don't need to do so again. But we do