parallel_emerge: prioritize pkgs blocking others.
This optimization is strictly at the local optimum level; the
purpose is that if all things being equal for the existant scoring,
we should try to prioritize unlocking the rest of the graph.
In local testing (primarily binpkg), the effect is mostly mixed into
background noise- that said the change *is* desired overall since
it locally prioritizes bottlenecks in the graph.
Additionally, centralize the scoring and heap manipulations into
a class in preparation for future enhancements, and to avoid exposing
internals to consuming code.
BUG=None
TEST=build_package --board=target # ensure it behaves fine.
Change-Id: I233009c3ad11c402635af47bb9bdd77952857f72
Reviewed-on: https://gerrit.chromium.org/gerrit/18424
Reviewed-by: Brian Harring <ferringb@chromium.org>
Tested-by: Brian Harring <ferringb@chromium.org>
Commit-Ready: Brian Harring <ferringb@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 894bdc9..b0c5c1c 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -1114,6 +1114,56 @@
continue
raise
+
+class ScoredTarget(object):
+
+ __slots__ = ("target", "info", "score")
+
+ def __init__(self, target, info):
+ self.target, self.info = target, info
+ self.update_score()
+
+ def __cmp__(self, other):
+ return cmp(self.score, other.score)
+
+ def update_score(self):
+ self.score = (
+ -len(self.info["tprovides"]),
+ self.info["binary"],
+ -len(self.info["provides"]),
+ self.info["idx"],
+ self.target,
+ )
+
+
+class ScoredHeap(object):
+
+ def __init__(self, initial=()):
+ self.heap = list(initial)
+ if self.heap:
+ self.sort()
+
+ def get(self):
+ node = heapq.heappop(self.heap)
+ return node.target, node.info
+
+ def put(self, target, data):
+ heapq.heappush(self.heap, ScoredTarget(target, data))
+
+ def put_multi(self, sequence):
+ self.heap.extend(ScoredTarget(*x) for x in sequence)
+ self.sort()
+
+ def __nonzero__(self):
+ return bool(self.heap)
+
+ def sort(self):
+ heapq.heapify(self.heap)
+
+ def __len__(self):
+ return len(self.heap)
+
+
class EmergeQueue(object):
"""Class to schedule emerge jobs according to a dependency graph."""
@@ -1122,7 +1172,7 @@
self._deps_map = deps_map
# Initialize the running queue to empty
self._jobs = {}
- self._ready = []
+ self._ready = ScoredHeap()
# List of total package installs represented in deps_map.
install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
self._total_jobs = len(install_jobs)
@@ -1164,13 +1214,13 @@
self._SetupExitHandler()
# Schedule our jobs.
+ l = []
for target, info in deps_map.items():
if info["nodeps"] or not info["needs"]:
- score = (-len(info["tprovides"]), info["binary"], info["idx"])
- self._ready.append((score, target))
- heapq.heapify(self._ready)
+ l.append((target, info))
+
+ self._ready.put_multi(l)
self._procs = procs
- self._ScheduleLoop()
# Print an update.
self._Status()
@@ -1230,7 +1280,7 @@
# Schedule more jobs.
while self._ready and len(self._jobs) < needed_jobs:
- score, pkg = heapq.heappop(self._ready)
+ pkg, data = self._ready.get()
if pkg not in self._failed:
self._Schedule(pkg)
@@ -1295,9 +1345,7 @@
if dep_pkg["nodeps"] and dep_pkg["action"] == "nomerge":
self._Finish(dep)
else:
- score = (-len(dep_pkg["tprovides"]), dep_pkg["binary"],
- dep_pkg["idx"])
- heapq.heappush(self._ready, (score, dep))
+ self._ready.put(dep, dep_pkg)
self._deps_map.pop(target)
def _Retry(self):