parallel_emerge: add --unpackonly.

The --unpackonly flag makes parallel-emerge fetch and then unpack all
packages, but not emerge them. This was accomplished by creating an
unpack queue, which, if --unpackonly is set, is filled as the fetch queue
is completed.

If we desire to work with the binary packages only of a project, the
workflow is to run --fetchonly and then extracting each package. By
adding --unpackonly, we can parallelize this with fetchonly and make the
entire workflow faster.

BUG=chromium:264240
TEST=Compared sysroot using --unpackonly with using --fetchonly and then
     unpacking all packages.

Change-Id: I3d9acb7032f036b786e3cba9843b7720471eecdb
Signed-off-by: Thiago Goncales <thiagog@chromium.org>
Reviewed-on: https://gerrit.chromium.org/gerrit/62856
Reviewed-by: David James <davidjames@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 5e5833b..f536b18 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -29,6 +29,8 @@
 import time
 import traceback
 
+from chromite.lib import cros_build_lib
+
 # If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
 # Chromium OS, the default "portage" user doesn't have the necessary
 # permissions. It'd be easier if we could default to $USERNAME, but $USERNAME
@@ -203,13 +205,14 @@
     PrintDepsMap(deps_graph)
   """
 
-  __slots__ = ["board", "emerge", "package_db", "show_output"]
+  __slots__ = ["board", "emerge", "package_db", "show_output", "unpack_only"]
 
   def __init__(self):
     self.board = None
     self.emerge = EmergeData()
     self.package_db = {}
     self.show_output = False
+    self.unpack_only = False
 
   def ParseParallelEmergeArgs(self, argv):
     """Read the parallel emerge arguments from the command-line.
@@ -239,6 +242,9 @@
         self.show_output = True
       elif arg == "--rebuild":
         emerge_args.append("--rebuild-if-unbuilt")
+      elif arg == "--unpackonly":
+        emerge_args.append("--fetchonly")
+        self.unpack_only = True
       else:
         # Not one of our options, so pass through to emerge.
         emerge_args.append(arg)
@@ -792,10 +798,10 @@
 class EmergeJobState(object):
   __slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
                "last_output_timestamp", "pkgname", "retcode", "start_timestamp",
-               "target", "fetch_only"]
+               "target", "fetch_only", "unpack_only"]
 
   def __init__(self, target, pkgname, done, filename, start_timestamp,
-               retcode=None, fetch_only=False):
+               retcode=None, fetch_only=False, unpack_only=False):
 
     # The full name of the target we're building (e.g.
     # chromeos-base/chromeos-0.0.1-r60)
@@ -833,6 +839,9 @@
     # The timestamp when our job started.
     self.start_timestamp = start_timestamp
 
+    # No emerge, only unpack packages.
+    self.unpack_only = unpack_only
+
 
 def KillHandler(_signum, _frame):
   # Kill self and all subprocesses.
@@ -916,7 +925,42 @@
     # Return the exit code of the subprocess.
     return os.waitpid(pid, 0)[1]
 
-def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
+
+def UnpackPackage(pkg_state):
+  """Unpacks package described by pkg_state.
+
+  Args:
+    pkg_state: EmergeJobState object describing target.
+
+  Returns:
+    Exit code returned by subprocess.
+  """
+  pkgdir = os.environ.get("PKGDIR",
+                          os.path.join(os.environ["SYSROOT"], "packages"))
+  root = os.environ.get("ROOT", os.environ["SYSROOT"])
+  path = os.path.join(pkgdir, pkg_state.target + ".tbz2")
+  comp = cros_build_lib.FindCompressor(cros_build_lib.COMP_BZIP2)
+  cmd = [comp, "-dc"]
+  if comp.endswith("pbzip2"):
+    cmd.append("--ignore-trailing-garbage=1")
+  cmd.append(path)
+
+  result = cros_build_lib.RunCommand(cmd, cwd=root, stdout_to_pipe=True,
+                                     print_cmd=False, error_code_ok=True)
+
+  # If we were not successful, return now and don't attempt untar.
+  if result.returncode:
+    return result.returncode
+
+  cmd = ["sudo", "tar", "-xf", "-", "-C", root]
+  result = cros_build_lib.RunCommand(cmd, cwd=root, input=result.output,
+                                     print_cmd=False, error_code_ok=True)
+
+  return result.returncode
+
+
+def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False,
+                 unpack_only=False):
   """This worker emerges any packages given to it on the task_queue.
 
   Args:
@@ -925,6 +969,7 @@
     emerge: An EmergeData() object.
     package_db: A dict, mapping package ids to portage Package objects.
     fetch_only: A bool, indicating if we should just fetch the target.
+    unpack_only: A bool, indicating if we should just unpack the target.
 
   It expects package identifiers to be passed to it via task_queue. When
   a task is started, it pushes the (target, filename) to the started_queue.
@@ -982,16 +1027,19 @@
     os.chmod(output.name, 644)
     start_timestamp = time.time()
     job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
-                         fetch_only=fetch_only)
+                         fetch_only=fetch_only, unpack_only=unpack_only)
     job_queue.put(job)
     if "--pretend" in opts:
       retcode = 0
     else:
       try:
         emerge.scheduler_graph.mergelist = install_list
-        retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
-            spinner, favorites=emerge.favorites,
-            graph_config=emerge.scheduler_graph)
+        if unpack_only:
+          retcode = UnpackPackage(pkg_state)
+        else:
+          retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
+                                  spinner, favorites=emerge.favorites,
+                                  graph_config=emerge.scheduler_graph)
       except Exception:
         traceback.print_exc(file=output)
         retcode = 1
@@ -1001,7 +1049,8 @@
       return
 
     job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
-                         retcode, fetch_only=fetch_only)
+                         retcode, fetch_only=fetch_only,
+                         unpack_only=unpack_only)
     job_queue.put(job)
 
 
@@ -1176,7 +1225,7 @@
 class EmergeQueue(object):
   """Class to schedule emerge jobs according to a dependency graph."""
 
-  def __init__(self, deps_map, emerge, package_db, show_output):
+  def __init__(self, deps_map, emerge, package_db, show_output, unpack_only):
     # Store the dependency graph.
     self._deps_map = deps_map
     self._state_map = {}
@@ -1185,10 +1234,13 @@
     self._build_ready = ScoredHeap()
     self._fetch_jobs = {}
     self._fetch_ready = ScoredHeap()
+    self._unpack_jobs = {}
+    self._unpack_ready = ScoredHeap()
     # List of total package installs represented in deps_map.
     install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
     self._total_jobs = len(install_jobs)
     self._show_output = show_output
+    self._unpack_only = unpack_only
 
     if "--pretend" in emerge.opts:
       print "Skipping merge because of --pretend mode."
@@ -1207,7 +1259,7 @@
     # jobs.
     procs = min(self._total_jobs,
                 emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
-    self._build_procs = self._fetch_procs = max(1, procs)
+    self._build_procs = self._unpack_procs = self._fetch_procs = max(1, procs)
     self._load_avg = emerge.opts.pop("--load-average", None)
     self._job_queue = multiprocessing.Queue()
     self._print_queue = multiprocessing.Queue()
@@ -1222,6 +1274,14 @@
     self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
                                             args)
 
+    if self._unpack_only:
+      # Unpack pool only required on unpack_only jobs.
+      self._unpack_queue = multiprocessing.Queue()
+      args = (self._unpack_queue, self._job_queue, emerge, package_db, False,
+              True)
+      self._unpack_pool = multiprocessing.Pool(self._unpack_procs, EmergeWorker,
+                                               args)
+
     self._print_worker = multiprocessing.Process(target=PrintWorker,
                                                  args=[self._print_queue])
     self._print_worker.start()
@@ -1267,6 +1327,10 @@
     signal.signal(signal.SIGINT, ExitHandler)
     signal.signal(signal.SIGTERM, ExitHandler)
 
+  def _ScheduleUnpack(self, pkg_state):
+    self._unpack_jobs[pkg_state.target] = None
+    self._unpack_queue.put(pkg_state)
+
   def _Schedule(self, pkg_state):
     # We maintain a tree of all deps, if this doesn't need
     # to be installed just free up its children and continue.
@@ -1284,19 +1348,31 @@
         self._build_queue.put(pkg_state)
         return True
 
-  def _ScheduleLoop(self):
+  def _ScheduleLoop(self, unpack_only=False):
+    if unpack_only:
+      ready_queue = self._unpack_ready
+      jobs_queue = self._unpack_jobs
+      procs = self._unpack_procs
+    else:
+      ready_queue = self._build_ready
+      jobs_queue = self._build_jobs
+      procs = self._build_procs
+
     # If the current load exceeds our desired load average, don't schedule
     # more than one job.
     if self._load_avg and os.getloadavg()[0] > self._load_avg:
       needed_jobs = 1
     else:
-      needed_jobs = self._build_procs
+      needed_jobs = procs
 
     # Schedule more jobs.
-    while self._build_ready and len(self._build_jobs) < needed_jobs:
-      state = self._build_ready.get()
-      if state.target not in self._failed:
-        self._Schedule(state)
+    while ready_queue and len(jobs_queue) < needed_jobs:
+      state = ready_queue.get()
+      if unpack_only:
+        self._ScheduleUnpack(state)
+      else:
+        if state.target not in self._failed:
+          self._Schedule(state)
 
   def _Print(self, line):
     """Print a single line."""
@@ -1337,12 +1413,15 @@
     if no_output:
       seconds = current_time - GLOBAL_START
       fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
+      ujobs, uready = len(self._unpack_jobs), len(self._unpack_ready)
       bjobs, bready = len(self._build_jobs), len(self._build_ready)
       retries = len(self._retry_queue)
       pending = max(0, len(self._deps_map) - fjobs - bjobs)
       line = "Pending %s/%s, " % (pending, self._total_jobs)
       if fjobs or fready:
         line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
+      if ujobs or uready:
+        line += "Unpacking %s/%s, " % (ujobs, uready + ujobs)
       if bjobs or bready or retries:
         line += "Building %s/%s, " % (bjobs, bready + bjobs)
         if retries:
@@ -1405,6 +1484,10 @@
     _stop(self._build_queue, self._build_pool)
     self._build_queue = self._build_pool = None
 
+    if self._unpack_only:
+      _stop(self._unpack_queue, self._unpack_pool)
+      self._unpack_queue = self._unpack_pool = None
+
     if self._job_queue is not None:
       self._job_queue.close()
       self._job_queue = None
@@ -1444,6 +1527,8 @@
           self._job_queue.empty() and
           not self._fetch_jobs and
           not self._fetch_ready and
+          not self._unpack_jobs and
+          not self._unpack_ready and
           not self._build_jobs and
           not self._build_ready and
           self._deps_map):
@@ -1498,6 +1583,10 @@
             self._build_ready.put(state)
             self._ScheduleLoop()
 
+          if self._unpack_only and job.retcode == 0:
+            self._unpack_ready.put(state)
+            self._ScheduleLoop(unpack_only=True)
+
           if self._fetch_ready:
             state = self._fetch_ready.get()
             self._fetch_queue.put(state)
@@ -1508,6 +1597,23 @@
             self._fetch_queue.put(None)
         continue
 
+      if job.unpack_only:
+        if not job.done:
+          self._unpack_jobs[target] = job
+        else:
+          del self._unpack_jobs[target]
+          self._Print("Unpacked %s in %2.2fs"
+                      % (target, time.time() - job.start_timestamp))
+          if self._show_output or job.retcode != 0:
+            self._print_queue.put(JobPrinter(job, unlink=True))
+          else:
+            os.unlink(job.filename)
+          if self._unpack_ready:
+            state = self._unpack_ready.get()
+            self._unpack_queue.put(state)
+            self._unpack_jobs[state.target] = None
+        continue
+
       if not job.done:
         self._build_jobs[target] = job
         self._Print("Started %s (logged in %s)" % (target, job.filename))
@@ -1667,7 +1773,8 @@
     os.execvp(args[0], args)
 
   # Run the queued emerges.
-  scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
+  scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output,
+                          deps.unpack_only)
   try:
     scheduler.Run()
   finally: