Refactor nag functionality in to NagTimer class.

Add default 30 second nag timer to gclient subprocesses.

BUG=227537

Review URL: https://chromiumcodereview.appspot.com/14826003

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@198207 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/subprocess2.py b/subprocess2.py
index e817986..ac44555 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -132,6 +132,42 @@
   return env
 
 
+class NagTimer(object):
+  """
+  Triggers a callback when a time interval passes without an event being fired.
+
+  For example, the event could be receiving terminal output from a subprocess;
+  and the callback could print a warning to stderr that the subprocess appeared
+  to be hung.
+  """
+  def __init__(self, interval, cb):
+    self.interval = interval
+    self.cb = cb
+    self.timer = threading.Timer(self.interval, self.fn)
+    self.last_output = self.previous_last_output = 0
+
+  def start(self):
+    self.last_output = self.previous_last_output = time.time()
+    self.timer.start()
+
+  def event(self):
+    self.last_output = time.time()
+
+  def fn(self):
+    now = time.time()
+    if self.last_output == self.previous_last_output:
+      self.cb(now - self.previous_last_output)
+    # Use 0.1 fudge factor, just in case
+    #   (self.last_output - now) is very close to zero.
+    sleep_time = (self.last_output - now - 0.1) % self.interval
+    self.previous_last_output = self.last_output
+    self.timer = threading.Timer(sleep_time + 0.1, self.fn)
+    self.timer.start()
+
+  def cancel(self):
+    self.timer.cancel()
+
+
 class Popen(subprocess.Popen):
   """Wraps subprocess.Popen() with various workarounds.
 
@@ -192,6 +228,7 @@
     self.start = time.time()
     self.timeout = None
     self.nag_timer = None
+    self.nag_max = None
     self.shell = kwargs.get('shell', None)
     # Silence pylint on MacOSX
     self.returncode = None
@@ -230,8 +267,7 @@
     # because of memory exhaustion.
     queue = Queue.Queue()
     done = threading.Event()
-    timer = []
-    last_output = [time.time()] * 2
+    nag = None
 
     def write_stdin():
       try:
@@ -253,28 +289,12 @@
           data = pipe.read(1)
           if not data:
             break
-          last_output[0] = time.time()
+          if nag:
+            nag.event()
           queue.put((name, data))
       finally:
         queue.put(name)
 
-    def nag_fn():
-      now = time.time()
-      if done.is_set():
-        return
-      if last_output[0] == last_output[1]:
-        logging.warn('  No output for %.0f seconds from command:' % (
-            now - last_output[1]))
-        logging.warn('    %s' % self.cmd_str)
-      # Use 0.1 fudge factor in case:
-      #   now ~= last_output[0] + self.nag_timer
-      sleep_time = self.nag_timer + last_output[0] - now - 0.1
-      while sleep_time < 0:
-        sleep_time += self.nag_timer
-      last_output[1] = last_output[0]
-      timer[0] = threading.Timer(sleep_time, nag_fn)
-      timer[0].start()
-
     def timeout_fn():
       try:
         done.wait(self.timeout)
@@ -313,8 +333,15 @@
       t.start()
 
     if self.nag_timer:
-      timer.append(threading.Timer(self.nag_timer, nag_fn))
-      timer[0].start()
+      def _nag_cb(elapsed):
+        logging.warn('  No output for %.0f seconds from command:' % elapsed)
+        logging.warn('    %s' % self.cmd_str)
+        if (self.nag_max and
+            int('%.0f' % (elapsed / self.nag_timer)) >= self.nag_max):
+          queue.put('timeout')
+          done.set()  # Must do this so that timeout thread stops waiting.
+      nag = NagTimer(self.nag_timer, _nag_cb)
+      nag.start()
 
     timed_out = False
     try:
@@ -327,20 +354,22 @@
           self.stderr_cb(item[1])
         else:
           # A thread terminated.
-          threads[item].join()
-          del threads[item]
+          if item in threads:
+            threads[item].join()
+            del threads[item]
           if item == 'wait':
             # Terminate the timeout thread if necessary.
             done.set()
           elif item == 'timeout' and not timed_out and self.poll() is None:
-            logging.debug('Timed out after %fs: killing' % self.timeout)
+            logging.debug('Timed out after %.0fs: killing' % (
+                time.time() - self.start))
             self.kill()
             timed_out = True
     finally:
       # Stop the threads.
       done.set()
-      if timer:
-        timer[0].cancel()
+      if nag:
+        nag.cancel()
       if 'wait' in threads:
         # Accelerate things, otherwise it would hang until the child process is
         # done.
@@ -353,7 +382,8 @@
         self.returncode = TIMED_OUT
 
   # pylint: disable=W0221,W0622
-  def communicate(self, input=None, timeout=None, nag_timer=None):
+  def communicate(self, input=None, timeout=None, nag_timer=None,
+                  nag_max=None):
     """Adds timeout and callbacks support.
 
     Returns (stdout, stderr) like subprocess.Popen().communicate().
@@ -365,6 +395,7 @@
     """
     self.timeout = timeout
     self.nag_timer = nag_timer
+    self.nag_max = nag_max
     if (not self.timeout and not self.nag_timer and
         not self.stdout_cb and not self.stderr_cb):
       return super(Popen, self).communicate(input)
@@ -393,7 +424,7 @@
     return (stdout, stderr)
 
 
-def communicate(args, timeout=None, nag_timer=None, **kwargs):
+def communicate(args, timeout=None, nag_timer=None, nag_max=None, **kwargs):
   """Wraps subprocess.Popen().communicate() and add timeout support.
 
   Returns ((stdout, stderr), returncode).