Revamped terminal output for update.

Features:

- Non-verbose output is now limited to a one-line progress
indicator.

- Verbose output is now collated per subprocess.  As soon as a
subprocess finishes, its full output is dumped to terminal.

- Verbose output is prefixed with timestamps representing elapsed
time since the beginning of the gclient invocation.

- git progress indicators ("Receiving objects", etc.) are limited to
one line every 10 seconds.

- In both verbose and non-verbose mode, if a failure occurs, the
full output of the failed update operation is dumped to terminal
just before exit.

- In the event that updates are progressing, but slowly,
"Still working" messages will be printed periodically, to pacify
users and buildbots.

BUG=
R=hinoka@google.com

Review URL: https://codereview.chromium.org/227163002

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@262500 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/gclient_utils.py b/gclient_utils.py
index 3517946..f89b601 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -6,6 +6,7 @@
 
 import codecs
 import cStringIO
+import datetime
 import logging
 import os
 import pipes
@@ -25,6 +26,7 @@
 
 RETRY_MAX = 3
 RETRY_INITIAL_SLEEP = 0.5
+START = datetime.datetime.now()
 
 
 _WARNINGS = []
@@ -47,6 +49,12 @@
     super(Error, self).__init__(msg, *args, **kwargs)
 
 
+def Elapsed(until=None):
+  if until is None:
+    until = datetime.datetime.now()
+  return str(until - START).partition('.')[0]
+
+
 def PrintWarnings():
   """Prints any accumulated warnings."""
   if _WARNINGS:
@@ -483,12 +491,8 @@
           output.write(in_byte)
           if print_stdout:
             stdout.write(in_byte)
-          if in_byte != '\r':
-            if in_byte != '\n':
-              in_line += in_byte
-            else:
-              filter_fn(in_line)
-              in_line = ''
+          if in_byte not in ['\r', '\n']:
+            in_line += in_byte
           else:
             filter_fn(in_line)
             in_line = ''
@@ -525,9 +529,9 @@
   Allows a custom function to skip certain lines (predicate), and will throttle
   the output of percentage completed lines to only output every X seconds.
   """
-  PERCENT_RE = re.compile('.* ([0-9]{1,2})% .*')
+  PERCENT_RE = re.compile('(.*) ([0-9]{1,3})% .*')
 
-  def __init__(self, time_throttle=0, predicate=None):
+  def __init__(self, time_throttle=0, predicate=None, out_fh=None):
     """
     Args:
       time_throttle (int): GitFilter will throttle 'noisy' output (such as the
@@ -535,10 +539,13 @@
         seconds apart.
       predicate (f(line)): An optional function which is invoked for every line.
         The line will be skipped if predicate(line) returns False.
+      out_fh: File handle to write output to.
     """
     self.last_time = 0
     self.time_throttle = time_throttle
     self.predicate = predicate
+    self.out_fh = out_fh or sys.stdout
+    self.progress_prefix = None
 
   def __call__(self, line):
     # git uses an escape sequence to clear the line; elide it.
@@ -549,11 +556,14 @@
       return
     now = time.time()
     match = self.PERCENT_RE.match(line)
-    if not match:
-      self.last_time = 0
-    if (now - self.last_time) >= self.time_throttle:
-      self.last_time = now
-      print line
+    if match:
+      if match.group(1) != self.progress_prefix:
+        self.progress_prefix = match.group(1)
+      elif now - self.last_time < self.time_throttle:
+        return
+    self.last_time = now
+    self.out_fh.write('[%s] ' % Elapsed())
+    print >> self.out_fh, line
 
 
 def FindGclientRoot(from_dir, filename='.gclient'):
@@ -683,6 +693,8 @@
   def __init__(self, name):
     # A unique string representing this work item.
     self._name = name
+    self.outbuf = cStringIO.StringIO()
+    self.start = self.finish = None
 
   def run(self, work_queue):
     """work_queue is passed as keyword argument so it should be
@@ -704,7 +716,7 @@
 
   Methods of this class are thread safe.
   """
-  def __init__(self, jobs, progress, ignore_requirements):
+  def __init__(self, jobs, progress, ignore_requirements, verbose=False):
     """jobs specifies the number of concurrent tasks to allow. progress is a
     Progress instance."""
     # Set when a thread is done or a new item is enqueued.
@@ -725,6 +737,9 @@
       self.progress.update(0)
 
     self.ignore_requirements = ignore_requirements
+    self.verbose = verbose
+    self.last_join = None
+    self.last_subproc_output = None
 
   def enqueue(self, d):
     """Enqueue one Dependency to be executed later once its requirements are
@@ -743,9 +758,30 @@
     finally:
       self.ready_cond.release()
 
+  def out_cb(self, _):
+    self.last_subproc_output = datetime.datetime.now()
+    return True
+
+  @staticmethod
+  def format_task_output(task, comment=''):
+    if comment:
+      comment = ' (%s)' % comment
+    if task.start and task.finish:
+      elapsed = ' (Elapsed: %s)' % (
+          str(task.finish - task.start).partition('.')[0])
+    else:
+      elapsed = ''
+    return """
+%s%s%s
+----------------------------------------
+%s
+----------------------------------------""" % (
+    task.name, comment, task.outbuf.getvalue().strip(), elapsed)
+
   def flush(self, *args, **kwargs):
     """Runs all enqueued items until all are executed."""
     kwargs['work_queue'] = self
+    self.last_subproc_output = self.last_join = datetime.datetime.now()
     self.ready_cond.acquire()
     try:
       while True:
@@ -778,6 +814,17 @@
         # We need to poll here otherwise Ctrl-C isn't processed.
         try:
           self.ready_cond.wait(10)
+          # If we haven't printed to terminal for a while, but we have received
+          # spew from a suprocess, let the user know we're still progressing.
+          now = datetime.datetime.now()
+          if (now - self.last_join > datetime.timedelta(seconds=60) and
+              self.last_subproc_output > self.last_join):
+            if self.progress:
+              print >> sys.stdout, ''
+            elapsed = Elapsed()
+            print >> sys.stdout, '[%s] Still working on:' % elapsed
+            for task in self.running:
+              print >> sys.stdout, '[%s]   %s' % (elapsed, task.item.name)
         except KeyboardInterrupt:
           # Help debugging by printing some information:
           print >> sys.stderr, (
@@ -788,7 +835,10 @@
               ', '.join(self.ran),
               len(self.running)))
           for i in self.queued:
-            print >> sys.stderr, '%s: %s' % (i.name, ', '.join(i.requirements))
+            print >> sys.stderr, '%s (not started): %s' % (
+                i.name, ', '.join(i.requirements))
+          for i in self.running:
+            print >> sys.stderr, self.format_task_output(i.item, 'interrupted')
           raise
         # Something happened: self.enqueue() or a thread terminated. Loop again.
     finally:
@@ -796,11 +846,14 @@
 
     assert not self.running, 'Now guaranteed to be single-threaded'
     if not self.exceptions.empty():
+      if self.progress:
+        print >> sys.stdout, ''
       # To get back the stack location correctly, the raise a, b, c form must be
       # used, passing a tuple as the first argument doesn't work.
-      e = self.exceptions.get()
+      e, task = self.exceptions.get()
+      print >> sys.stderr, self.format_task_output(task.item, 'ERROR')
       raise e[0], e[1], e[2]
-    if self.progress:
+    elif self.progress:
       self.progress.end()
 
   def _flush_terminated_threads(self):
@@ -812,7 +865,10 @@
         self.running.append(t)
       else:
         t.join()
+        self.last_join = datetime.datetime.now()
         sys.stdout.flush()
+        if self.verbose:
+          print >> sys.stdout, self.format_task_output(t.item)
         if self.progress:
           self.progress.update(1, t.item.name)
         if t.item.name in self.ran:
@@ -832,10 +888,26 @@
     else:
       # Run the 'thread' inside the main thread. Don't try to catch any
       # exception.
-      task_item.run(*args, **kwargs)
-      self.ran.append(task_item.name)
-      if self.progress:
-        self.progress.update(1, ', '.join(t.item.name for t in self.running))
+      try:
+        task_item.start = datetime.datetime.now()
+        print >> task_item.outbuf, '[%s] Started.' % Elapsed(task_item.start)
+        task_item.run(*args, **kwargs)
+        task_item.finish = datetime.datetime.now()
+        print >> task_item.outbuf, '[%s] Finished.' % Elapsed(task_item.finish)
+        self.ran.append(task_item.name)
+        if self.verbose:
+          if self.progress:
+            print >> sys.stdout, ''
+          print >> sys.stdout, self.format_task_output(task_item)
+        if self.progress:
+          self.progress.update(1, ', '.join(t.item.name for t in self.running))
+      except KeyboardInterrupt:
+        print >> sys.stderr, self.format_task_output(task_item, 'interrupted')
+        raise
+      except Exception:
+        print >> sys.stderr, self.format_task_output(task_item, 'ERROR')
+        raise
+
 
   class _Worker(threading.Thread):
     """One thread to execute one WorkItem."""
@@ -853,17 +925,21 @@
       logging.debug('_Worker.run(%s)' % self.item.name)
       work_queue = self.kwargs['work_queue']
       try:
+        self.item.start = datetime.datetime.now()
+        print >> self.item.outbuf, '[%s] Started.' % Elapsed(self.item.start)
         self.item.run(*self.args, **self.kwargs)
+        self.item.finish = datetime.datetime.now()
+        print >> self.item.outbuf, '[%s] Finished.' % Elapsed(self.item.finish)
       except KeyboardInterrupt:
         logging.info('Caught KeyboardInterrupt in thread %s', self.item.name)
         logging.info(str(sys.exc_info()))
-        work_queue.exceptions.put(sys.exc_info())
+        work_queue.exceptions.put((sys.exc_info(), self))
         raise
       except Exception:
         # Catch exception location.
         logging.info('Caught exception in thread %s', self.item.name)
         logging.info(str(sys.exc_info()))
-        work_queue.exceptions.put(sys.exc_info())
+        work_queue.exceptions.put((sys.exc_info(), self))
       finally:
         logging.info('_Worker.run(%s) done', self.item.name)
         work_queue.ready_cond.acquire()