Revert "parallel_emerge: Implement prefetching of packages."
This reverts commit 710ab070f98cc65337bffcfddcfa6fb20acba84b
See I6edfe0b5.
We see failures like this:
Traceback (most recent call last):
File "/home/chrome-bot/trunk/chromite/bin/parallel_emerge", line 64, in <module>
ret = target()
File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1658, in main
scheduler.Run()
File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1469, in Run
self._Retry()
File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1413, in _Retry
if self._Schedule(target):
File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1307, in _Schedule
this_pkg = pkg_state.info
AttributeError: 'str' object has no attribute 'info'
Example: http://chromegw.corp.google.com/i/chromeos/builders/x86-alex%20canary/builds/1829/steps/UnitTest%20%5Bx86-alex%5D/logs/stdio
BUG=chromium-os:13401
TEST=none
Change-Id: I7b9dabd323613be45ea60b0a70b36f9f9ed936f1
Reviewed-on: https://gerrit.chromium.org/gerrit/19033
Reviewed-by: David James <davidjames@chromium.org>
Tested-by: David James <davidjames@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 61c68d4..6189eac 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -830,10 +830,10 @@
class EmergeJobState(object):
__slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
"last_output_timestamp", "pkgname", "retcode", "start_timestamp",
- "target", "fetch_only"]
+ "target"]
def __init__(self, target, pkgname, done, filename, start_timestamp,
- retcode=None, fetch_only=False):
+ retcode=None):
# The full name of the target we're building (e.g.
# chromeos-base/chromeos-0.0.1-r60)
@@ -865,9 +865,6 @@
# The return code of our job, if the job is actually finished.
self.retcode = retcode
- # Was this just a fetch job?
- self.fetch_only = fetch_only
-
# The timestamp when our job started.
self.start_timestamp = start_timestamp
@@ -943,7 +940,7 @@
# Return the exit code of the subprocess.
return os.waitpid(pid, 0)[1]
-def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
+def EmergeWorker(task_queue, job_queue, emerge, package_db):
"""This worker emerges any packages given to it on the task_queue.
Args:
@@ -951,7 +948,6 @@
job_queue: The queue of results from the worker.
emerge: An EmergeData() object.
package_db: A dict, mapping package ids to portage Package objects.
- fetch_only: A bool, indicating if we should just fetch the target.
It expects package identifiers to be passed to it via task_queue. When
a task is started, it pushes the (target, filename) to the started_queue.
@@ -966,49 +962,29 @@
root = emerge.settings["ROOT"]
vardb = emerge.trees[root]["vartree"].dbapi
vardb._flush_cache_enabled = False
- bindb = emerge.trees[root]["bintree"].dbapi
- # Might be a set, might be a list, might be None; no clue, just use shallow
- # copy to ensure we can roll it back.
- original_remotepkgs = copy.copy(bindb.bintree._remotepkgs)
opts, spinner = emerge.opts, emerge.spinner
opts["--nodeps"] = True
- if fetch_only:
- opts["--fetchonly"] = True
-
while True:
# Wait for a new item to show up on the queue. This is a blocking wait,
# so if there's nothing to do, we just sit here.
- pkg_state = task_queue.get()
- if pkg_state is None:
+ target = task_queue.get()
+ if not target:
# If target is None, this means that the main thread wants us to quit.
# The other workers need to exit too, so we'll push the message back on
# to the queue so they'll get it too.
- task_queue.put(None)
+ task_queue.put(target)
return
if KILLED.is_set():
return
- target = pkg_state.target
-
db_pkg = package_db[target]
-
- if db_pkg.type_name == "binary":
- if not fetch_only and pkg_state.fetched_successfully:
- # Ensure portage doesn't think our pkg is remote- else it'll force
- # a redownload of it (even if the on-disk file is fine). In-memory
- # caching basically, implemented dumbly.
- bindb.bintree._remotepkgs = None
- else:
- bindb.bintree_remotepkgs = original_remotepkgs
-
db_pkg.root_config = emerge.root_config
install_list = [db_pkg]
pkgname = db_pkg.pf
output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
start_timestamp = time.time()
- job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
- fetch_only=fetch_only)
+ job = EmergeJobState(target, pkgname, False, output.name, start_timestamp)
job_queue.put(job)
if "--pretend" in opts:
retcode = 0
@@ -1032,7 +1008,7 @@
return
job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
- retcode, fetch_only=fetch_only)
+ retcode)
job_queue.put(job)
@@ -1139,14 +1115,12 @@
raise
-class TargetState(object):
+class ScoredTarget(object):
- __slots__ = ("target", "info", "score", "prefetched", "fetched_successfully")
+ __slots__ = ("target", "info", "score")
- def __init__(self, target, info, fetched=False):
+ def __init__(self, target, info):
self.target, self.info = target, info
- self.fetched_successfully = False
- self.prefetched = False
self.update_score()
def __cmp__(self, other):
@@ -1155,7 +1129,6 @@
def update_score(self):
self.score = (
-len(self.info["tprovides"]),
- len(self.info["needs"]),
not self.info["binary"],
-len(self.info["provides"]),
self.info["idx"],
@@ -1165,40 +1138,28 @@
class ScoredHeap(object):
- __slots__ = ("heap", "_heap_set")
-
def __init__(self, initial=()):
- self.heap = list()
- self._heap_set = set()
- if initial:
- self.multi_put(initial)
+ self.heap = list(initial)
+ if self.heap:
+ self.sort()
def get(self):
- item = heapq.heappop(self.heap)
- self._heap_set.remove(item.target)
- return item
+ node = heapq.heappop(self.heap)
+ return node.target, node.info
- def put(self, item):
- if not isinstance(item, TargetState):
- raise ValueError("Item %r isn't a TargetState" % (item,))
- heapq.heappush(self.heap, item)
- self._heap_set.add(item.target)
+ def put(self, target, data):
+ heapq.heappush(self.heap, ScoredTarget(target, data))
- def multi_put(self, sequence):
- sequence = list(sequence)
- self.heap.extend(sequence)
- self._heap_set.update(x.target for x in sequence)
+ def put_multi(self, sequence):
+ self.heap.extend(ScoredTarget(*x) for x in sequence)
self.sort()
- def sort(self):
- heapq.heapify(self.heap)
-
- def __contains__(self, target):
- return target in self._heap_set
-
def __nonzero__(self):
return bool(self.heap)
+ def sort(self):
+ heapq.heapify(self.heap)
+
def __len__(self):
return len(self.heap)
@@ -1209,12 +1170,9 @@
def __init__(self, deps_map, emerge, package_db, show_output):
# Store the dependency graph.
self._deps_map = deps_map
- self._state_map = {}
# Initialize the running queue to empty
- self._build_jobs = {}
- self._build_ready = ScoredHeap()
- self._fetch_jobs = {}
- self._fetch_ready = ScoredHeap()
+ self._jobs = {}
+ self._ready = ScoredHeap()
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
@@ -1237,22 +1195,12 @@
# jobs.
procs = min(self._total_jobs,
emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
- self._build_procs = procs
- self._fetch_procs = procs
self._load_avg = emerge.opts.pop("--load-average", None)
+ self._emerge_queue = multiprocessing.Queue()
self._job_queue = multiprocessing.Queue()
self._print_queue = multiprocessing.Queue()
-
- self._fetch_queue = multiprocessing.Queue()
- args = (self._fetch_queue, self._job_queue, emerge, package_db, True)
- self._fetch_pool = multiprocessing.Pool(self._fetch_procs, EmergeWorker,
- args)
-
- self._build_queue = multiprocessing.Queue()
- args = (self._build_queue, self._job_queue, emerge, package_db)
- self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
- args)
-
+ args = (self._emerge_queue, self._job_queue, emerge, package_db)
+ self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
self._print_worker = multiprocessing.Process(target=PrintWorker,
args=[self._print_queue])
self._print_worker.start()
@@ -1266,9 +1214,16 @@
self._SetupExitHandler()
# Schedule our jobs.
- self._state_map.update(
- (pkg, TargetState(pkg, data)) for pkg, data in deps_map.iteritems())
- self._fetch_ready.multi_put(self._state_map.itervalues())
+ l = []
+ for target, info in deps_map.items():
+ if info["nodeps"] or not info["needs"]:
+ l.append((target, info))
+
+ self._ready.put_multi(l)
+ self._procs = procs
+
+ # Print an update.
+ self._Status()
def _SetupExitHandler(self):
@@ -1281,7 +1236,7 @@
signal.signal(signal.SIGTERM, KillHandler)
# Print our current job status
- for job in self._build_jobs.itervalues():
+ for target, job in self._jobs.iteritems():
if job:
self._print_queue.put(JobPrinter(job, unlink=True))
@@ -1298,22 +1253,22 @@
signal.signal(signal.SIGINT, ExitHandler)
signal.signal(signal.SIGTERM, ExitHandler)
- def _Schedule(self, pkg_state):
+ def _Schedule(self, target):
# We maintain a tree of all deps, if this doesn't need
# to be installed just free up its children and continue.
# It is possible to reinstall deps of deps, without reinstalling
# first level deps, like so:
# chromeos (merge) -> eselect (nomerge) -> python (merge)
- this_pkg = pkg_state.info
- target = pkg_state.target
- if pkg_state.info is not None:
- if this_pkg["action"] == "nomerge":
- self._Finish(target)
- elif target not in self._build_jobs:
- # Kick off the build if it's marked to be built.
- self._build_jobs[target] = None
- self._build_queue.put(pkg_state)
- return True
+ this_pkg = self._deps_map.get(target)
+ if this_pkg is None:
+ pass
+ elif this_pkg["action"] == "nomerge":
+ self._Finish(target)
+ elif target not in self._jobs:
+ # Kick off the build if it's marked to be built.
+ self._jobs[target] = None
+ self._emerge_queue.put(target)
+ return True
def _ScheduleLoop(self):
# If the current load exceeds our desired load average, don't schedule
@@ -1321,13 +1276,13 @@
if self._load_avg and os.getloadavg()[0] > self._load_avg:
needed_jobs = 1
else:
- needed_jobs = self._build_procs
+ needed_jobs = self._procs
# Schedule more jobs.
- while self._build_ready and len(self._build_jobs) < needed_jobs:
- state = self._build_ready.get()
- if state.target not in self._failed:
- self._Schedule(state)
+ while self._ready and len(self._jobs) < needed_jobs:
+ pkg, data = self._ready.get()
+ if pkg not in self._failed:
+ self._Schedule(pkg)
def _Print(self, line):
"""Print a single line."""
@@ -1347,7 +1302,7 @@
else:
interval = 60 * 60
notify_interval = 60 * 2
- for target, job in self._build_jobs.iteritems():
+ for target, job in self._jobs.iteritems():
if job:
last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
if last_timestamp + interval < current_time:
@@ -1367,20 +1322,12 @@
# here.
if no_output:
seconds = current_time - GLOBAL_START
- fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
- bjobs, bready = len(self._build_jobs), len(self._build_ready)
- retries = len(self._retry_queue)
- pending = max(0, len(self._deps_map) - fjobs - bjobs)
- line = "Pending %s/%s, " % (pending, self._total_jobs)
- if fjobs or fready:
- line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
- if bjobs or bready or retries:
- line += "Building %s/%s, " % (bjobs, bready + bjobs)
- if retries:
- line += "Retrying %s, " % (retries,)
+ line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
+ "[Time %dm%.1fs Load %s]")
load = " ".join(str(x) for x in os.getloadavg())
- line += (" [Time %dm%.1fs Load %s]" % (seconds/60, seconds %60, load))
- self._Print(line)
+ self._Print(line % (len(self._deps_map), len(self._ready),
+ len(self._jobs), len(self._retry_queue),
+ self._total_jobs, seconds / 60, seconds % 60, load))
def _Finish(self, target):
"""Mark a target as completed and unblock dependencies."""
@@ -1393,18 +1340,12 @@
finish = []
for dep in this_pkg["provides"]:
dep_pkg = self._deps_map[dep]
- state = self._state_map[dep]
del dep_pkg["needs"][target]
- state.update_score()
- if not state.prefetched:
- if dep in self._fetch_ready:
- # If it's not currently being fetched, update the priorization
- self._fetch_ready.sort()
- elif not dep_pkg["needs"]:
+ if not dep_pkg["needs"]:
if dep_pkg["nodeps"] and dep_pkg["action"] == "nomerge":
self._Finish(dep)
else:
- self._build_ready.put(self._state_map[dep])
+ self._ready.put(dep, dep_pkg)
self._deps_map.pop(target)
def _Retry(self):
@@ -1417,19 +1358,11 @@
def _Exit(self):
# Tell emerge workers to exit. They all exit when 'None' is pushed
# to the queue.
- self._fetch_queue.put(None)
- self._build_queue.put(None)
-
- self._fetch_pool.close()
- self._fetch_pool.join()
-
- self._build_pool.close()
- self._build_pool.join()
-
- self._build_queue.close()
- self._fetch_queue.close()
-
- self._build_queue = self._fetch_queue = None
+ self._emerge_queue.put(None)
+ self._pool.close()
+ self._pool.join()
+ self._emerge_queue.close()
+ self._emerge_queue = None
# Now that our workers are finished, we can kill the print queue.
self._print_queue.put(None)
@@ -1445,24 +1378,13 @@
Keep running so long as we have uninstalled packages in the
dependency graph to merge.
"""
- # Start the fetchers.
- for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
- state = self._fetch_ready.get()
- self._fetch_jobs[state.target] = None
- self._fetch_queue.put(state)
-
- # Print an update, then get going.
- self._Status()
-
retried = set()
while self._deps_map:
# Check here that we are actually waiting for something.
- if (self._build_queue.empty() and
+ if (self._emerge_queue.empty() and
self._job_queue.empty() and
- not self._fetch_jobs and
- not self._fetch_ready and
- not self._build_jobs and
- not self._build_ready and
+ not self._jobs and
+ not self._ready and
self._deps_map):
# If we have failed on a package, retry it now.
if self._retry_queue:
@@ -1495,40 +1417,8 @@
target = job.target
- if job.fetch_only:
- if not job.done:
- self._Print("Fetching %s (logged in %s)" % (target, job.filename))
- self._fetch_jobs[job.target] = job
- else:
- state = self._state_map[job.target]
- state.prefetched = True
- state.fetched_successfully = (job.retcode == 0)
- del self._fetch_jobs[job.target]
- self._Print("Fetched %s in %2.2fs"
- % (target, time.time() - job.start_timestamp))
-
- if self._show_output or job.retcode != 0:
- self._print_queue.put(JobPrinter(job, unlink=True))
- else:
- os.unlink(job.filename)
- # Failure or not, let build work with it next.
- if not self._deps_map[job.target]["needs"]:
- self._build_ready.put(state)
- self._ScheduleLoop()
-
- if self._fetch_ready:
- state = self._fetch_ready.get()
- self._fetch_queue.put(state)
- self._fetch_jobs[state.target] = None
- else:
- # Minor optimization; shut down fetchers early since we know
- # the queue is empty.
- self._fetch_queue.put(None)
- self._Status()
- continue
-
if not job.done:
- self._build_jobs[target] = job
+ self._jobs[target] = job
self._Print("Started %s (logged in %s)" % (target, job.filename))
continue
@@ -1537,7 +1427,7 @@
self._print_queue.put(JobPrinter(job, unlink=True))
else:
os.unlink(job.filename)
- del self._build_jobs[target]
+ del self._jobs[target]
seconds = time.time() - job.start_timestamp
details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)