Revert "parallel_emerge: Implement prefetching of packages."

This reverts commit 710ab070f98cc65337bffcfddcfa6fb20acba84b
See I6edfe0b5.

We see failures like this:
Traceback (most recent call last):
  File "/home/chrome-bot/trunk/chromite/bin/parallel_emerge", line 64, in <module>
    ret = target()
  File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1658, in main
    scheduler.Run()
  File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1469, in Run
    self._Retry()
  File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1413, in _Retry
    if self._Schedule(target):
  File "/home/chrome-bot/trunk/chromite/scripts/parallel_emerge.py", line 1307, in _Schedule
    this_pkg = pkg_state.info
AttributeError: 'str' object has no attribute 'info'

Example: http://chromegw.corp.google.com/i/chromeos/builders/x86-alex%20canary/builds/1829/steps/UnitTest%20%5Bx86-alex%5D/logs/stdio

BUG=chromium-os:13401
TEST=none

Change-Id: I7b9dabd323613be45ea60b0a70b36f9f9ed936f1
Reviewed-on: https://gerrit.chromium.org/gerrit/19033
Reviewed-by: David James <davidjames@chromium.org>
Tested-by: David James <davidjames@chromium.org>
diff --git a/scripts/parallel_emerge.py b/scripts/parallel_emerge.py
index 61c68d4..6189eac 100644
--- a/scripts/parallel_emerge.py
+++ b/scripts/parallel_emerge.py
@@ -830,10 +830,10 @@
 class EmergeJobState(object):
   __slots__ = ["done", "filename", "last_notify_timestamp", "last_output_seek",
                "last_output_timestamp", "pkgname", "retcode", "start_timestamp",
-               "target", "fetch_only"]
+               "target"]
 
   def __init__(self, target, pkgname, done, filename, start_timestamp,
-               retcode=None, fetch_only=False):
+               retcode=None):
 
     # The full name of the target we're building (e.g.
     # chromeos-base/chromeos-0.0.1-r60)
@@ -865,9 +865,6 @@
     # The return code of our job, if the job is actually finished.
     self.retcode = retcode
 
-    # Was this just a fetch job?
-    self.fetch_only = fetch_only
-
     # The timestamp when our job started.
     self.start_timestamp = start_timestamp
 
@@ -943,7 +940,7 @@
     # Return the exit code of the subprocess.
     return os.waitpid(pid, 0)[1]
 
-def EmergeWorker(task_queue, job_queue, emerge, package_db, fetch_only=False):
+def EmergeWorker(task_queue, job_queue, emerge, package_db):
   """This worker emerges any packages given to it on the task_queue.
 
   Args:
@@ -951,7 +948,6 @@
     job_queue: The queue of results from the worker.
     emerge: An EmergeData() object.
     package_db: A dict, mapping package ids to portage Package objects.
-    fetch_only: A bool, indicating if we should just fetch the target.
 
   It expects package identifiers to be passed to it via task_queue. When
   a task is started, it pushes the (target, filename) to the started_queue.
@@ -966,49 +962,29 @@
   root = emerge.settings["ROOT"]
   vardb = emerge.trees[root]["vartree"].dbapi
   vardb._flush_cache_enabled = False
-  bindb = emerge.trees[root]["bintree"].dbapi
-  # Might be a set, might be a list, might be None; no clue, just use shallow
-  # copy to ensure we can roll it back.
-  original_remotepkgs = copy.copy(bindb.bintree._remotepkgs)
 
   opts, spinner = emerge.opts, emerge.spinner
   opts["--nodeps"] = True
-  if fetch_only:
-    opts["--fetchonly"] = True
-
   while True:
     # Wait for a new item to show up on the queue. This is a blocking wait,
     # so if there's nothing to do, we just sit here.
-    pkg_state = task_queue.get()
-    if pkg_state is None:
+    target = task_queue.get()
+    if not target:
       # If target is None, this means that the main thread wants us to quit.
       # The other workers need to exit too, so we'll push the message back on
       # to the queue so they'll get it too.
-      task_queue.put(None)
+      task_queue.put(target)
       return
     if KILLED.is_set():
       return
 
-    target = pkg_state.target
-
     db_pkg = package_db[target]
-
-    if db_pkg.type_name == "binary":
-      if not fetch_only and pkg_state.fetched_successfully:
-        # Ensure portage doesn't think our pkg is remote- else it'll force
-        # a redownload of it (even if the on-disk file is fine).  In-memory
-        # caching basically, implemented dumbly.
-        bindb.bintree._remotepkgs = None
-    else:
-      bindb.bintree_remotepkgs = original_remotepkgs
-
     db_pkg.root_config = emerge.root_config
     install_list = [db_pkg]
     pkgname = db_pkg.pf
     output = tempfile.NamedTemporaryFile(prefix=pkgname + "-", delete=False)
     start_timestamp = time.time()
-    job = EmergeJobState(target, pkgname, False, output.name, start_timestamp,
-                         fetch_only=fetch_only)
+    job = EmergeJobState(target, pkgname, False, output.name, start_timestamp)
     job_queue.put(job)
     if "--pretend" in opts:
       retcode = 0
@@ -1032,7 +1008,7 @@
       return
 
     job = EmergeJobState(target, pkgname, True, output.name, start_timestamp,
-                         retcode, fetch_only=fetch_only)
+                         retcode)
     job_queue.put(job)
 
 
@@ -1139,14 +1115,12 @@
       raise
 
 
-class TargetState(object):
+class ScoredTarget(object):
 
-  __slots__ = ("target", "info", "score", "prefetched", "fetched_successfully")
+  __slots__ = ("target", "info", "score")
 
-  def __init__(self, target, info, fetched=False):
+  def __init__(self, target, info):
     self.target, self.info = target, info
-    self.fetched_successfully = False
-    self.prefetched = False
     self.update_score()
 
   def __cmp__(self, other):
@@ -1155,7 +1129,6 @@
   def update_score(self):
     self.score = (
         -len(self.info["tprovides"]),
-        len(self.info["needs"]),
         not self.info["binary"],
         -len(self.info["provides"]),
         self.info["idx"],
@@ -1165,40 +1138,28 @@
 
 class ScoredHeap(object):
 
-  __slots__ = ("heap", "_heap_set")
-
   def __init__(self, initial=()):
-    self.heap = list()
-    self._heap_set = set()
-    if initial:
-      self.multi_put(initial)
+    self.heap = list(initial)
+    if self.heap:
+      self.sort()
 
   def get(self):
-    item = heapq.heappop(self.heap)
-    self._heap_set.remove(item.target)
-    return item
+    node = heapq.heappop(self.heap)
+    return node.target, node.info
 
-  def put(self, item):
-    if not isinstance(item, TargetState):
-      raise ValueError("Item %r isn't a TargetState" % (item,))
-    heapq.heappush(self.heap, item)
-    self._heap_set.add(item.target)
+  def put(self, target, data):
+    heapq.heappush(self.heap, ScoredTarget(target, data))
 
-  def multi_put(self, sequence):
-    sequence = list(sequence)
-    self.heap.extend(sequence)
-    self._heap_set.update(x.target for x in sequence)
+  def put_multi(self, sequence):
+    self.heap.extend(ScoredTarget(*x) for x in sequence)
     self.sort()
 
-  def sort(self):
-    heapq.heapify(self.heap)
-
-  def __contains__(self, target):
-    return target in self._heap_set
-
   def __nonzero__(self):
     return bool(self.heap)
 
+  def sort(self):
+    heapq.heapify(self.heap)
+
   def __len__(self):
     return len(self.heap)
 
@@ -1209,12 +1170,9 @@
   def __init__(self, deps_map, emerge, package_db, show_output):
     # Store the dependency graph.
     self._deps_map = deps_map
-    self._state_map = {}
     # Initialize the running queue to empty
-    self._build_jobs = {}
-    self._build_ready = ScoredHeap()
-    self._fetch_jobs = {}
-    self._fetch_ready = ScoredHeap()
+    self._jobs = {}
+    self._ready = ScoredHeap()
     # List of total package installs represented in deps_map.
     install_jobs = [x for x in deps_map if deps_map[x]["action"] == "merge"]
     self._total_jobs = len(install_jobs)
@@ -1237,22 +1195,12 @@
     # jobs.
     procs = min(self._total_jobs,
                 emerge.opts.pop("--jobs", multiprocessing.cpu_count()))
-    self._build_procs = procs
-    self._fetch_procs = procs
     self._load_avg = emerge.opts.pop("--load-average", None)
+    self._emerge_queue = multiprocessing.Queue()
     self._job_queue = multiprocessing.Queue()
     self._print_queue = multiprocessing.Queue()
-
-    self._fetch_queue = multiprocessing.Queue()
-    args = (self._fetch_queue, self._job_queue, emerge, package_db, True)
-    self._fetch_pool = multiprocessing.Pool(self._fetch_procs, EmergeWorker,
-                                            args)
-
-    self._build_queue = multiprocessing.Queue()
-    args = (self._build_queue, self._job_queue, emerge, package_db)
-    self._build_pool = multiprocessing.Pool(self._build_procs, EmergeWorker,
-                                            args)
-
+    args = (self._emerge_queue, self._job_queue, emerge, package_db)
+    self._pool = multiprocessing.Pool(procs, EmergeWorker, args)
     self._print_worker = multiprocessing.Process(target=PrintWorker,
                                                  args=[self._print_queue])
     self._print_worker.start()
@@ -1266,9 +1214,16 @@
     self._SetupExitHandler()
 
     # Schedule our jobs.
-    self._state_map.update(
-        (pkg, TargetState(pkg, data)) for pkg, data in deps_map.iteritems())
-    self._fetch_ready.multi_put(self._state_map.itervalues())
+    l = []
+    for target, info in deps_map.items():
+      if info["nodeps"] or not info["needs"]:
+        l.append((target, info))
+
+    self._ready.put_multi(l)
+    self._procs = procs
+
+    # Print an update.
+    self._Status()
 
   def _SetupExitHandler(self):
 
@@ -1281,7 +1236,7 @@
       signal.signal(signal.SIGTERM, KillHandler)
 
       # Print our current job status
-      for job in self._build_jobs.itervalues():
+      for target, job in self._jobs.iteritems():
         if job:
           self._print_queue.put(JobPrinter(job, unlink=True))
 
@@ -1298,22 +1253,22 @@
     signal.signal(signal.SIGINT, ExitHandler)
     signal.signal(signal.SIGTERM, ExitHandler)
 
-  def _Schedule(self, pkg_state):
+  def _Schedule(self, target):
     # We maintain a tree of all deps, if this doesn't need
     # to be installed just free up its children and continue.
     # It is possible to reinstall deps of deps, without reinstalling
     # first level deps, like so:
     # chromeos (merge) -> eselect (nomerge) -> python (merge)
-    this_pkg = pkg_state.info
-    target = pkg_state.target
-    if pkg_state.info is not None:
-      if this_pkg["action"] == "nomerge":
-        self._Finish(target)
-      elif target not in self._build_jobs:
-        # Kick off the build if it's marked to be built.
-        self._build_jobs[target] = None
-        self._build_queue.put(pkg_state)
-        return True
+    this_pkg = self._deps_map.get(target)
+    if this_pkg is None:
+      pass
+    elif this_pkg["action"] == "nomerge":
+      self._Finish(target)
+    elif target not in self._jobs:
+      # Kick off the build if it's marked to be built.
+      self._jobs[target] = None
+      self._emerge_queue.put(target)
+      return True
 
   def _ScheduleLoop(self):
     # If the current load exceeds our desired load average, don't schedule
@@ -1321,13 +1276,13 @@
     if self._load_avg and os.getloadavg()[0] > self._load_avg:
       needed_jobs = 1
     else:
-      needed_jobs = self._build_procs
+      needed_jobs = self._procs
 
     # Schedule more jobs.
-    while self._build_ready and len(self._build_jobs) < needed_jobs:
-      state = self._build_ready.get()
-      if state.target not in self._failed:
-        self._Schedule(state)
+    while self._ready and len(self._jobs) < needed_jobs:
+      pkg, data = self._ready.get()
+      if pkg not in self._failed:
+        self._Schedule(pkg)
 
   def _Print(self, line):
     """Print a single line."""
@@ -1347,7 +1302,7 @@
     else:
       interval = 60 * 60
       notify_interval = 60 * 2
-    for target, job in self._build_jobs.iteritems():
+    for target, job in self._jobs.iteritems():
       if job:
         last_timestamp = max(job.start_timestamp, job.last_output_timestamp)
         if last_timestamp + interval < current_time:
@@ -1367,20 +1322,12 @@
     # here.
     if no_output:
       seconds = current_time - GLOBAL_START
-      fjobs, fready = len(self._fetch_jobs), len(self._fetch_ready)
-      bjobs, bready = len(self._build_jobs), len(self._build_ready)
-      retries = len(self._retry_queue)
-      pending = max(0, len(self._deps_map) - fjobs - bjobs)
-      line = "Pending %s/%s, " % (pending, self._total_jobs)
-      if fjobs or fready:
-        line += "Fetching %s/%s, " % (fjobs, fready + fjobs)
-      if bjobs or bready or retries:
-        line += "Building %s/%s, " % (bjobs, bready + bjobs)
-        if retries:
-          line += "Retrying %s, " % (retries,)
+      line = ("Pending %s, Ready %s, Running %s, Retrying %s, Total %s "
+              "[Time %dm%.1fs Load %s]")
       load =  " ".join(str(x) for x in os.getloadavg())
-      line += (" [Time %dm%.1fs Load %s]" % (seconds/60, seconds %60, load))
-      self._Print(line)
+      self._Print(line % (len(self._deps_map), len(self._ready),
+                          len(self._jobs), len(self._retry_queue),
+                          self._total_jobs, seconds / 60, seconds % 60, load))
 
   def _Finish(self, target):
     """Mark a target as completed and unblock dependencies."""
@@ -1393,18 +1340,12 @@
       finish = []
       for dep in this_pkg["provides"]:
         dep_pkg = self._deps_map[dep]
-        state = self._state_map[dep]
         del dep_pkg["needs"][target]
-        state.update_score()
-        if not state.prefetched:
-          if dep in self._fetch_ready:
-            # If it's not currently being fetched, update the priorization
-            self._fetch_ready.sort()
-        elif not dep_pkg["needs"]:
+        if not dep_pkg["needs"]:
           if dep_pkg["nodeps"] and dep_pkg["action"] == "nomerge":
             self._Finish(dep)
           else:
-            self._build_ready.put(self._state_map[dep])
+            self._ready.put(dep, dep_pkg)
       self._deps_map.pop(target)
 
   def _Retry(self):
@@ -1417,19 +1358,11 @@
   def _Exit(self):
     # Tell emerge workers to exit. They all exit when 'None' is pushed
     # to the queue.
-    self._fetch_queue.put(None)
-    self._build_queue.put(None)
-
-    self._fetch_pool.close()
-    self._fetch_pool.join()
-
-    self._build_pool.close()
-    self._build_pool.join()
-
-    self._build_queue.close()
-    self._fetch_queue.close()
-
-    self._build_queue = self._fetch_queue = None
+    self._emerge_queue.put(None)
+    self._pool.close()
+    self._pool.join()
+    self._emerge_queue.close()
+    self._emerge_queue = None
 
     # Now that our workers are finished, we can kill the print queue.
     self._print_queue.put(None)
@@ -1445,24 +1378,13 @@
     Keep running so long as we have uninstalled packages in the
     dependency graph to merge.
     """
-    # Start the fetchers.
-    for _ in xrange(min(self._fetch_procs, len(self._fetch_ready))):
-      state = self._fetch_ready.get()
-      self._fetch_jobs[state.target] = None
-      self._fetch_queue.put(state)
-
-    # Print an update, then get going.
-    self._Status()
-
     retried = set()
     while self._deps_map:
       # Check here that we are actually waiting for something.
-      if (self._build_queue.empty() and
+      if (self._emerge_queue.empty() and
           self._job_queue.empty() and
-          not self._fetch_jobs and
-          not self._fetch_ready and
-          not self._build_jobs and
-          not self._build_ready and
+          not self._jobs and
+          not self._ready and
           self._deps_map):
         # If we have failed on a package, retry it now.
         if self._retry_queue:
@@ -1495,40 +1417,8 @@
 
       target = job.target
 
-      if job.fetch_only:
-        if not job.done:
-          self._Print("Fetching %s (logged in %s)" % (target, job.filename))
-          self._fetch_jobs[job.target] = job
-        else:
-          state = self._state_map[job.target]
-          state.prefetched = True
-          state.fetched_successfully = (job.retcode == 0)
-          del self._fetch_jobs[job.target]
-          self._Print("Fetched %s in %2.2fs"
-                      % (target, time.time() - job.start_timestamp))
-
-          if self._show_output or job.retcode != 0:
-            self._print_queue.put(JobPrinter(job, unlink=True))
-          else:
-            os.unlink(job.filename)
-          # Failure or not, let build work with it next.
-          if not self._deps_map[job.target]["needs"]:
-            self._build_ready.put(state)
-            self._ScheduleLoop()
-
-          if self._fetch_ready:
-            state = self._fetch_ready.get()
-            self._fetch_queue.put(state)
-            self._fetch_jobs[state.target] = None
-          else:
-            # Minor optimization; shut down fetchers early since we know
-            # the queue is empty.
-            self._fetch_queue.put(None)
-          self._Status()
-        continue
-
       if not job.done:
-        self._build_jobs[target] = job
+        self._jobs[target] = job
         self._Print("Started %s (logged in %s)" % (target, job.filename))
         continue
 
@@ -1537,7 +1427,7 @@
         self._print_queue.put(JobPrinter(job, unlink=True))
       else:
         os.unlink(job.filename)
-      del self._build_jobs[target]
+      del self._jobs[target]
 
       seconds = time.time() - job.start_timestamp
       details = "%s (in %dm%.1fs)" % (target, seconds / 60, seconds % 60)