parallel_emerge: add --unpackonly.
The --unpackonly flag makes parallel-emerge fetch and then unpack all
packages, but not emerge them. This was accomplished by creating an
unpack queue, which, if --unpackonly is set, is filled as the fetch queue
is completed.
If we desire to work with the binary packages only of a project, the
workflow is to run --fetchonly and then extracting each package. By
adding --unpackonly, we can parallelize this with fetchonly and make the
entire workflow faster.
BUG=chromium:264240
TEST=Compared sysroot using --unpackonly with using --fetchonly and then
unpacking all packages.
Change-Id: I3d9acb7032f036b786e3cba9843b7720471eecdb
Signed-off-by: Thiago Goncales <thiagog@chromium.org>
Reviewed-on: https://gerrit.chromium.org/gerrit/62856
Reviewed-by: David James <davidjames@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 5e5833b..f536b18 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -29,6 +29,8 @@
import time
import traceback
+from chromite.lib import cros_build_lib
+
# If PORTAGE_USERNAME isn't specified, scrape it from the $HOME variable. On
# Chromium OS, the default "portage" user doesn't have the necessary
# permissions. It'd be easier if we could default to $USERNAME, but $USERNAME
@@ -203,13 +205,14 @@
PrintDepsMap(deps_graph)
"""
- __slots__ = ["board", "emerge", "package_db", "show_output"]
+ __slots__ = ["board", "emerge", "package_db", "show_output", "unpack_only"]
def __init__(self):
self.board = None
self.emerge = EmergeData()
self.package_db = {}
self.show_output = False
+ self.unpack_only = False
def ParseParallelEmergeArgs(self, argv):
"""Read the parallel emerge arguments from the command-line.
@@ -239,6 +242,9 @@
self.show_output = True
elif arg == "--rebuild":
emerge_args.append("--rebuild-if-unbuilt")
+ elif arg == "--unpackonly":
+ emerge_args.append("--fetchonly")
+ self.unpack_only = True
else:
# Not one of our options, so pass through to emerge.
emerge_args.append(arg)
@@ -792,10 +798,10 @@
class EmergeJobState(object):
__slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
"last_output_timestamp", "pkgname", "retcode", "start_timestamp",
- "target", "fetch_only"]
+ "target", "fetch_only", "unpack_only"]
def __init__(self, target, pkgname, done, filename, start_timestamp,
- retcode=None, fetch_only=False):
+ retcode=None, fetch_only=False, unpack_only=False):
# The full name of the target we're building (e.g.
# chromeos-base/chromeos-0.0.1-r60)
@@ -833,6 +839,9 @@
# The timestamp when our job started.
self.start_timestamp = start_timestamp
+ # No emerge, only unpack packages.
+ self.unpack_only = unpack_only
+
def KillHandler(_signum, _frame):
# Kill self and all subprocesses.
@@ -916,7 +925,42 @@
# Return the exit code of the subprocess.
return os.waitpid(pid, 0)[1]
-def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
+
+def UnpackPackage(pkg_state):
+ """Unpacks package described by pkg_state.
+
+ Args:
+ pkg_state: EmergeJobState object describing target.
+
+ Returns:
+ Exit code returned by subprocess.
+ """
+ pkgdir = os.environ.get("PKGDIR",
+ os.path.join(os.environ["SYSROOT"], "packages"))
+ root = os.environ.get("ROOT", os.environ["SYSROOT"])
+ path = os.path.join(pkgdir, pkg_state.target + ".tbz2")
+ comp = cros_build_lib.FindCompressor(cros_build_lib.COMP_BZIP2)
+ cmd = [comp, "-dc"]
+ if comp.endswith("pbzip2"):
+ cmd.append("--ignore-trailing-garbage=1")
+ cmd.append(path)
+
+ result = cros_build_lib.RunCommand(cmd, cwd=root, stdout_to_pipe=True,
+ print_cmd=False, error_code_ok=True)
+
+ # If we were not successful, return now and don't attempt untar.
+ if result.returncode:
+ return result.returncode
+
+ cmd = ["sudo", "tar", "-xf", "-", "-C", root]
+ result = cros_build_lib.RunCommand(cmd, cwd=root, input=result.output,
+ print_cmd=False, error_code_ok=True)
+
+ return result.returncode
+
+
+def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False,
+ unpack_only=False):
"""This worker emerges any packages given to it on the task_queue.
Args:
@@ -925,6 +969,7 @@
emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects.
fetch_only: A bool, indicating if we should just fetch the target.
+ unpack_only: A bool, indicating if we should just unpack the target.
It expects package identifiers to be passed to it via task_queue. When
a task is started, it pushes the (target, filename) to the started_queue.
@@ -982,16 +1027,19 @@
os.chmod(output.name, 644)
start_timestamp = time.time()
job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
- fetch_only=fetch_only)
+ fetch_only=fetch_only, unpack_only=unpack_only)
job_queue.put(job)
if "--pretend" in opts:
retcode = 0
else:
try:
emerge.scheduler_graph.mergelist = install_list
- retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
- spinner, favorites=emerge.favorites,
- graph_config=emerge.scheduler_graph)
+ if unpack_only:
+ retcode = UnpackPackage(pkg_state)
+ else:
+ retcode = EmergeProcess(output, settings, trees, mtimedb, opts,
+ spinner, favorites=emerge.favorites,
+ graph_config=emerge.scheduler_graph)
except Exception:
traceback.print_exc(file=output)
retcode = 1
@@ -1001,7 +1049,8 @@
return
job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
- retcode, fetch_only=fetch_only)
+ retcode, fetch_only=fetch_only,
+ unpack_only=unpack_only)
job_queue.put(job)
@@ -1176,7 +1225,7 @@
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
- def __init__(self, deps_map, emerge, package_db, show_output):
+ def __init__(self, deps_map, emerge, package_db, show_output, unpack_only):
# Store the dependency graph.
self._deps_map = deps_map
self._state_map = {}
@@ -1185,10 +1234,13 @@
self._build_ready = ScoredHeap()
self._fetch_jobs = {}
self._fetch_ready = ScoredHeap()
+ self._unpack_jobs = {}
+ self._unpack_ready = ScoredHeap()
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
self._show_output = show_output
+ self._unpack_only = unpack_only
if "--pretend" in emerge.opts:
print "Skipping merge because of --pretend mode."
@@ -1207,7 +1259,7 @@
# jobs.
procs = min(self._total_jobs,
emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
- self._build_procs = self._fetch_procs = max(1, procs)
+ self._build_procs = self._unpack_procs = self._fetch_procs = max(1, procs)
self._load_avg = emerge.opts.pop("--load-average", None)
self._job_queue = multiprocessing.Queue()
self._print_queue = multiprocessing.Queue()
@@ -1222,6 +1274,14 @@
self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
args)
+ if self._unpack_only:
+ # Unpack pool only required on unpack_only jobs.
+ self._unpack_queue = multiprocessing.Queue()
+ args = (self._unpack_queue, self._job_queue, emerge, package_db, False,
+ True)
+ self._unpack_pool = multiprocessing.Pool(self._unpack_procs, EmergeWorker,
+ args)
+
self._print_worker = multiprocessing.Process(target=PrintWorker,
args=[self._print_queue])
self._print_worker.start()
@@ -1267,6 +1327,10 @@
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
+ def _ScheduleUnpack(self, pkg_state):
+ self._unpack_jobs[pkg_state.target] = None
+ self._unpack_queue.put(pkg_state)
+
def _Schedule(self, pkg_state):
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up its children and continue.
@@ -1284,19 +1348,31 @@
self._build_queue.put(pkg_state)
return True
- def _ScheduleLoop(self):
+ def _ScheduleLoop(self, unpack_only=False):
+ if unpack_only:
+ ready_queue = self._unpack_ready
+ jobs_queue = self._unpack_jobs
+ procs = self._unpack_procs
+ else:
+ ready_queue = self._build_ready
+ jobs_queue = self._build_jobs
+ procs = self._build_procs
+
# If the current load exceeds our desired load average, don't schedule
# more than one job.
if self._load_avg and os.getloadavg()[0] > self._load_avg:
needed_jobs = 1
else:
- needed_jobs = self._build_procs
+ needed_jobs = procs
# Schedule more jobs.
- while self._build_ready and len(self._build_jobs) < needed_jobs:
- state = self._build_ready.get()
- if state.target not in self._failed:
- self._Schedule(state)
+ while ready_queue and len(jobs_queue) < needed_jobs:
+ state = ready_queue.get()
+ if unpack_only:
+ self._ScheduleUnpack(state)
+ else:
+ if state.target not in self._failed:
+ self._Schedule(state)
def _Print(self, line):
"""Print a single line."""
@@ -1337,12 +1413,15 @@
if no_output:
seconds = current_time - GLOBAL_START
fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
+ ujobs, uready = len(self._unpack_jobs), len(self._unpack_ready)
bjobs, bready = len(self._build_jobs), len(self._build_ready)
retries = len(self._retry_queue)
pending = max(0, len(self._deps_map) - fjobs - bjobs)
line = "Pending %s/%s, " % (pending, self._total_jobs)
if fjobs or fready:
line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
+ if ujobs or uready:
+ line += "Unpacking %s/%s, " % (ujobs, uready + ujobs)
if bjobs or bready or retries:
line += "Building %s/%s, " % (bjobs, bready + bjobs)
if retries:
@@ -1405,6 +1484,10 @@
_stop(self._build_queue, self._build_pool)
self._build_queue = self._build_pool = None
+ if self._unpack_only:
+ _stop(self._unpack_queue, self._unpack_pool)
+ self._unpack_queue = self._unpack_pool = None
+
if self._job_queue is not None:
self._job_queue.close()
self._job_queue = None
@@ -1444,6 +1527,8 @@
self._job_queue.empty() and
not self._fetch_jobs and
not self._fetch_ready and
+ not self._unpack_jobs and
+ not self._unpack_ready and
not self._build_jobs and
not self._build_ready and
self._deps_map):
@@ -1498,6 +1583,10 @@
self._build_ready.put(state)
self._ScheduleLoop()
+ if self._unpack_only and job.retcode == 0:
+ self._unpack_ready.put(state)
+ self._ScheduleLoop(unpack_only=True)
+
if self._fetch_ready:
state = self._fetch_ready.get()
self._fetch_queue.put(state)
@@ -1508,6 +1597,23 @@
self._fetch_queue.put(None)
continue
+ if job.unpack_only:
+ if not job.done:
+ self._unpack_jobs[target] = job
+ else:
+ del self._unpack_jobs[target]
+ self._Print("Unpacked %s in %2.2fs"
+ % (target, time.time() - job.start_timestamp))
+ if self._show_output or job.retcode != 0:
+ self._print_queue.put(JobPrinter(job, unlink=True))
+ else:
+ os.unlink(job.filename)
+ if self._unpack_ready:
+ state = self._unpack_ready.get()
+ self._unpack_queue.put(state)
+ self._unpack_jobs[state.target] = None
+ continue
+
if not job.done:
self._build_jobs[target] = job
self._Print("Started %s (logged in %s)" % (target, job.filename))
@@ -1667,7 +1773,8 @@
os.execvp(args[0], args)
# Run the queued emerges.
- scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output)
+ scheduler = EmergeQueue(deps_graph, emerge, deps.package_db, deps.show_output,
+ deps.unpack_only)
try:
scheduler.Run()
finally: