parallel_emerge: Implement prefetching of packages.
For local testing w/ a limited internet connection (~25Mb/s),
in terms of performance gain, this takes 30-45s off of a ~18.5m build.
The gain can vary a bit dependant on how our BINHOST servers are behaving.
In deploying pre-fetching support, we get a minor gain (~3-4%) for
binpkg heavy merges, while structuring things so that when we add git
and svn awareness (specifically fetching it outside of unpack stage),
it can be ran by the prefetcher, allowing the system to build other
things while fetching is occuring.
Note that this is a relanding; it as reverted in
I7b9dabd323613be45ea60b0a70b36f9f9ed936f1 due to an AttributeError bug.
BUG=chromiumos-os:13401
TEST=build_packages per the norm
Change-Id: I8f3e5fa39defc2dc7e1f88b6f7f6906570bf0daf
Reviewed-on: https://gerrit.chromium.org/gerrit/19037
Reviewed-by: Brian Harring <ferringb@chromium.org>
Tested-by: Brian Harring <ferringb@chromium.org>
Commit-Ready: Brian Harring <ferringb@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 6189eac..fd0be55 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -830,10 +830,10 @@
class EmergeJobState(object):
__slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
"last_output_timestamp", "pkgname", "retcode", "start_timestamp",
- "target"]
+ "target", "fetch_only"]
def __init__(self, target, pkgname, done, filename, start_timestamp,
- retcode=None):
+ retcode=None, fetch_only=False):
# The full name of the target we're building (e.g.
# chromeos-base/chromeos-0.0.1-r60)
@@ -865,6 +865,9 @@
# The return code of our job, if the job is actually finished.
self.retcode = retcode
+ # Was this just a fetch job?
+ self.fetch_only = fetch_only
+
# The timestamp when our job started.
self.start_timestamp = start_timestamp
@@ -940,7 +943,7 @@
# Return the exit code of the subprocess.
return os.waitpid(pid, 0)[1]
-def EmergeWorker(task_queue, job_queue, emerge, package_db):
+def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
"""This worker emerges any packages given to it on the task_queue.
Args:
@@ -948,6 +951,7 @@
job_queue: The queue of results from the worker.
emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects.
+ fetch_only: A bool, indicating if we should just fetch the target.
It expects package identifiers to be passed to it via task_queue. When
a task is started, it pushes the (target, filename) to the started_queue.
@@ -962,29 +966,49 @@
root = emerge.settings["ROOT"]
vardb = emerge.trees[root]["vartree"].dbapi
vardb._flush_cache_enabled = False
+ bindb = emerge.trees[root]["bintree"].dbapi
+ # Might be a set, might be a list, might be None; no clue, just use shallow
+ # copy to ensure we can roll it back.
+ original_remotepkgs = copy.copy(bindb.bintree._remotepkgs)
opts, spinner = emerge.opts, emerge.spinner
opts["--nodeps"] = True
+ if fetch_only:
+ opts["--fetchonly"] = True
+
while True:
# Wait for a new item to show up on the queue. This is a blocking wait,
# so if there's nothing to do, we just sit here.
- target = task_queue.get()
- if not target:
+ pkg_state = task_queue.get()
+ if pkg_state is None:
# If target is None, this means that the main thread wants us to quit.
# The other workers need to exit too, so we'll push the message back on
# to the queue so they'll get it too.
- task_queue.put(target)
+ task_queue.put(None)
return
if KILLED.is_set():
return
+ target = pkg_state.target
+
db_pkg = package_db[target]
+
+ if db_pkg.type_name == "binary":
+ if not fetch_only and pkg_state.fetched_successfully:
+ # Ensure portage doesn't think our pkg is remote- else it'll force
+ # a redownload of it (even if the on-disk file is fine). In-memory
+ # caching basically, implemented dumbly.
+ bindb.bintree._remotepkgs = None
+ else:
+ bindb.bintree_remotepkgs = original_remotepkgs
+
db_pkg.root_config = emerge.root_config
install_list = [db_pkg]
pkgname = db_pkg.pf
output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
start_timestamp = time.time()
- job = EmergeJobState(target, pkgname, False, output.name, start_timestamp)
+ job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
+ fetch_only=fetch_only)
job_queue.put(job)
if "--pretend" in opts:
retcode = 0
@@ -1008,7 +1032,7 @@
return
job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
- retcode)
+ retcode, fetch_only=fetch_only)
job_queue.put(job)
@@ -1115,12 +1139,14 @@
raise
-class ScoredTarget(object):
+class TargetState(object):
- __slots__ = ("target", "info", "score")
+ __slots__ = ("target", "info", "score", "prefetched", "fetched_successfully")
- def __init__(self, target, info):
+ def __init__(self, target, info, fetched=False):
self.target, self.info = target, info
+ self.fetched_successfully = False
+ self.prefetched = False
self.update_score()
def __cmp__(self, other):
@@ -1129,6 +1155,7 @@
def update_score(self):
self.score = (
-len(self.info["tprovides"]),
+ len(self.info["needs"]),
not self.info["binary"],
-len(self.info["provides"]),
self.info["idx"],
@@ -1138,28 +1165,40 @@
class ScoredHeap(object):
+ __slots__ = ("heap", "_heap_set")
+
def __init__(self, initial=()):
- self.heap = list(initial)
- if self.heap:
- self.sort()
+ self.heap = list()
+ self._heap_set = set()
+ if initial:
+ self.multi_put(initial)
def get(self):
- node = heapq.heappop(self.heap)
- return node.target, node.info
+ item = heapq.heappop(self.heap)
+ self._heap_set.remove(item.target)
+ return item
- def put(self, target, data):
- heapq.heappush(self.heap, ScoredTarget(target, data))
+ def put(self, item):
+ if not isinstance(item, TargetState):
+ raise ValueError("Item %r isn't a TargetState" % (item,))
+ heapq.heappush(self.heap, item)
+ self._heap_set.add(item.target)
- def put_multi(self, sequence):
- self.heap.extend(ScoredTarget(*x) for x in sequence)
+ def multi_put(self, sequence):
+ sequence = list(sequence)
+ self.heap.extend(sequence)
+ self._heap_set.update(x.target for x in sequence)
self.sort()
- def __nonzero__(self):
- return bool(self.heap)
-
def sort(self):
heapq.heapify(self.heap)
+ def __contains__(self, target):
+ return target in self._heap_set
+
+ def __nonzero__(self):
+ return bool(self.heap)
+
def __len__(self):
return len(self.heap)
@@ -1170,9 +1209,12 @@
def __init__(self, deps_map, emerge, package_db, show_output):
# Store the dependency graph.
self._deps_map = deps_map
+ self._state_map = {}
# Initialize the running queue to empty
- self._jobs = {}
- self._ready = ScoredHeap()
+ self._build_jobs = {}
+ self._build_ready = ScoredHeap()
+ self._fetch_jobs = {}
+ self._fetch_ready = ScoredHeap()
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
@@ -1195,12 +1237,22 @@
# jobs.
procs = min(self._total_jobs,
emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
+ self._build_procs = procs
+ self._fetch_procs = procs
self._load_avg = emerge.opts.pop("--load-average", None)
- self._emerge_queue = multiprocessing.Queue()
self._job_queue = multiprocessing.Queue()
self._print_queue = multiprocessing.Queue()
- args = (self._emerge_queue, self._job_queue, emerge, package_db)
- self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
+
+ self._fetch_queue = multiprocessing.Queue()
+ args = (self._fetch_queue, self._job_queue, emerge, package_db, True)
+ self._fetch_pool = multiprocessing.Pool(self._fetch_procs, EmergeWorker,
+ args)
+
+ self._build_queue = multiprocessing.Queue()
+ args = (self._build_queue, self._job_queue, emerge, package_db)
+ self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
+ args)
+
self._print_worker = multiprocessing.Process(target=PrintWorker,
args=[self._print_queue])
self._print_worker.start()
@@ -1214,16 +1266,9 @@
self._SetupExitHandler()
# Schedule our jobs.
- l = []
- for target, info in deps_map.items():
- if info["nodeps"] or not info["needs"]:
- l.append((target, info))
-
- self._ready.put_multi(l)
- self._procs = procs
-
- # Print an update.
- self._Status()
+ self._state_map.update(
+ (pkg, TargetState(pkg, data)) for pkg, data in deps_map.iteritems())
+ self._fetch_ready.multi_put(self._state_map.itervalues())
def _SetupExitHandler(self):
@@ -1236,7 +1281,7 @@
signal.signal(signal.SIGTERM, KillHandler)
# Print our current job status
- for target, job in self._jobs.iteritems():
+ for job in self._build_jobs.itervalues():
if job:
self._print_queue.put(JobPrinter(job, unlink=True))
@@ -1253,22 +1298,22 @@
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
- def _Schedule(self, target):
+ def _Schedule(self, pkg_state):
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up its children and continue.
# It is possible to reinstall deps of deps, without reinstalling
# first level deps, like so:
# chromeos (merge) -> eselect (nomerge) -> python (merge)
- this_pkg = self._deps_map.get(target)
- if this_pkg is None:
- pass
- elif this_pkg["action"] == "nomerge":
- self._Finish(target)
- elif target not in self._jobs:
- # Kick off the build if it's marked to be built.
- self._jobs[target] = None
- self._emerge_queue.put(target)
- return True
+ this_pkg = pkg_state.info
+ target = pkg_state.target
+ if pkg_state.info is not None:
+ if this_pkg["action"] == "nomerge":
+ self._Finish(target)
+ elif target not in self._build_jobs:
+ # Kick off the build if it's marked to be built.
+ self._build_jobs[target] = None
+ self._build_queue.put(pkg_state)
+ return True
def _ScheduleLoop(self):
# If the current load exceeds our desired load average, don't schedule
@@ -1276,13 +1321,13 @@
if self._load_avg and os.getloadavg()[0] > self._load_avg:
needed_jobs = 1
else:
- needed_jobs = self._procs
+ needed_jobs = self._build_procs
# Schedule more jobs.
- while self._ready and len(self._jobs) < needed_jobs:
- pkg, data = self._ready.get()
- if pkg not in self._failed:
- self._Schedule(pkg)
+ while self._build_ready and len(self._build_jobs) < needed_jobs:
+ state = self._build_ready.get()
+ if state.target not in self._failed:
+ self._Schedule(state)
def _Print(self, line):
"""Print a single line."""
@@ -1302,7 +1347,7 @@
else:
interval = 60 * 60
notify_interval = 60 * 2
- for target, job in self._jobs.iteritems():
+ for target, job in self._build_jobs.iteritems():
if job:
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
if last_timestamp + interval < current_time:
@@ -1322,12 +1367,20 @@
# here.
if no_output:
seconds = current_time - GLOBAL_START
- line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
- "[Time %dm%.1fs Load %s]")
+ fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
+ bjobs, bready = len(self._build_jobs), len(self._build_ready)
+ retries = len(self._retry_queue)
+ pending = max(0, len(self._deps_map) - fjobs - bjobs)
+ line = "Pending %s/%s, " % (pending, self._total_jobs)
+ if fjobs or fready:
+ line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
+ if bjobs or bready or retries:
+ line += "Building %s/%s, " % (bjobs, bready + bjobs)
+ if retries:
+ line += "Retrying %s, " % (retries,)
load = " ".join(str(x) for x in os.getloadavg())
- self._Print(line % (len(self._deps_map), len(self._ready),
- len(self._jobs), len(self._retry_queue),
- self._total_jobs, seconds / 60, seconds % 60, load))
+ line += ("[Time %dm%.1fs Load %s]" % (seconds/60, seconds %60, load))
+ self._Print(line)
def _Finish(self, target):
"""Mark a target as completed and unblock dependencies."""
@@ -1340,29 +1393,43 @@
finish = []
for dep in this_pkg["provides"]:
dep_pkg = self._deps_map[dep]
+ state = self._state_map[dep]
del dep_pkg["needs"][target]
- if not dep_pkg["needs"]:
+ state.update_score()
+ if not state.prefetched:
+ if dep in self._fetch_ready:
+ # If it's not currently being fetched, update the prioritization
+ self._fetch_ready.sort()
+ elif not dep_pkg["needs"]:
if dep_pkg["nodeps"] and dep_pkg["action"] == "nomerge":
self._Finish(dep)
else:
- self._ready.put(dep, dep_pkg)
+ self._build_ready.put(self._state_map[dep])
self._deps_map.pop(target)
def _Retry(self):
while self._retry_queue:
- target = self._retry_queue.pop(0)
- if self._Schedule(target):
- self._Print("Retrying emerge of %s." % target)
+ state = self._retry_queue.pop(0)
+ if self._Schedule(state):
+ self._Print("Retrying emerge of %s." % state.target)
break
def _Exit(self):
# Tell emerge workers to exit. They all exit when 'None' is pushed
# to the queue.
- self._emerge_queue.put(None)
- self._pool.close()
- self._pool.join()
- self._emerge_queue.close()
- self._emerge_queue = None
+ self._fetch_queue.put(None)
+ self._build_queue.put(None)
+
+ self._fetch_pool.close()
+ self._fetch_pool.join()
+
+ self._build_pool.close()
+ self._build_pool.join()
+
+ self._build_queue.close()
+ self._fetch_queue.close()
+
+ self._build_queue = self._fetch_queue = None
# Now that our workers are finished, we can kill the print queue.
self._print_queue.put(None)
@@ -1378,13 +1445,24 @@
Keep running so long as we have uninstalled packages in the
dependency graph to merge.
"""
+ # Start the fetchers.
+ for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
+ state = self._fetch_ready.get()
+ self._fetch_jobs[state.target] = None
+ self._fetch_queue.put(state)
+
+ # Print an update, then get going.
+ self._Status()
+
retried = set()
while self._deps_map:
# Check here that we are actually waiting for something.
- if (self._emerge_queue.empty() and
+ if (self._build_queue.empty() and
self._job_queue.empty() and
- not self._jobs and
- not self._ready and
+ not self._fetch_jobs and
+ not self._fetch_ready and
+ not self._build_jobs and
+ not self._build_ready and
self._deps_map):
# If we have failed on a package, retry it now.
if self._retry_queue:
@@ -1417,8 +1495,38 @@
target = job.target
+ if job.fetch_only:
+ if not job.done:
+ self._fetch_jobs[job.target] = job
+ else:
+ state = self._state_map[job.target]
+ state.prefetched = True
+ state.fetched_successfully = (job.retcode == 0)
+ del self._fetch_jobs[job.target]
+ self._Print("Fetched %s in %2.2fs"
+ % (target, time.time() - job.start_timestamp))
+
+ if self._show_output or job.retcode != 0:
+ self._print_queue.put(JobPrinter(job, unlink=True))
+ else:
+ os.unlink(job.filename)
+ # Failure or not, let build work with it next.
+ if not self._deps_map[job.target]["needs"]:
+ self._build_ready.put(state)
+ self._ScheduleLoop()
+
+ if self._fetch_ready:
+ state = self._fetch_ready.get()
+ self._fetch_queue.put(state)
+ self._fetch_jobs[state.target] = None
+ else:
+ # Minor optimization; shut down fetchers early since we know
+ # the queue is empty.
+ self._fetch_queue.put(None)
+ continue
+
if not job.done:
- self._jobs[target] = job
+ self._build_jobs[target] = job
self._Print("Started %s (logged in %s)" % (target, job.filename))
continue
@@ -1427,7 +1535,7 @@
self._print_queue.put(JobPrinter(job, unlink=True))
else:
os.unlink(job.filename)
- del self._jobs[target]
+ del self._build_jobs[target]
seconds = time.time() - job.start_timestamp
details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)
@@ -1442,7 +1550,7 @@
else:
# Queue up this build to try again after a long while.
retried.add(target)
- self._retry_queue.append(target)
+ self._retry_queue.append(self._state_map[target])
self._failed.add(target)
self._Print("Failed %s, retrying later." % details)
else: