Reimplement r109239 but using Popen.communicate() instead.

Enables threaded callback handler for subprocess.communicate().

R=dpranke@chromium.org
BUG=
TEST=


Review URL: http://codereview.chromium.org/8749015

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@112465 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/subprocess2.py b/subprocess2.py
index c2f393a..3ad8047 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -8,12 +8,13 @@
 """
 
 from __future__ import with_statement
+import cStringIO
 import errno
 import logging
 import os
+import Queue
 import subprocess
 import sys
-import tempfile
 import time
 import threading
 
@@ -170,6 +171,10 @@
       tmp_str += ';  cwd=%s' % kwargs['cwd']
     logging.debug(tmp_str)
 
+    self.stdout_cb = None
+    self.stderr_cb = None
+    self.stdout_void = False
+    self.stderr_void = False
     def fix(stream):
       if kwargs.get(stream) in (VOID, os.devnull):
         # Replaces VOID with handle to /dev/null.
@@ -178,11 +183,17 @@
         # When the pipe fills up, it will deadlock this process. Using a real
         # file works around that issue.
         kwargs[stream] = open(os.devnull, 'w')
+        setattr(self, stream + '_void', True)
+      if callable(kwargs.get(stream)):
+        # Callable stdout/stderr should be used only with call() wrappers.
+        setattr(self, stream + '_cb', kwargs[stream])
+        kwargs[stream] = PIPE
 
     fix('stdout')
     fix('stderr')
 
     self.start = time.time()
+    self.timeout = None
     self.shell = kwargs.get('shell', None)
     # Silence pylint on MacOSX
     self.returncode = None
@@ -205,6 +216,152 @@
       # through
       raise
 
+  def _tee_threads(self, input):  # pylint: disable=W0622
+    """Does I/O for a process's pipes using threads.
+
+    It's the simplest and slowest implementation. Expect very slow behavior.
+
+    If there is a callback and it doesn't keep up with the calls, the timeout
+    effectiveness will be delayed accordingly.
+    """
+    # Queue of either of <threadname> when done or (<threadname>, data).  In
+    # theory we would like to limit to ~64kb items to not cause large memory
+    # usage when the callback blocks. It is not done because it slows down
+    # processing on OSX10.6 by a factor of 2x, making it even slower than
+    # Windows!  Revisit this decision if it becomes a problem, e.g. crash
+    # because of memory exhaustion.
+    queue = Queue.Queue()
+    done = threading.Event()
+
+    def write_stdin():
+      try:
+        stdin_io = cStringIO.StringIO(input)
+        while True:
+          data = stdin_io.read(1024)
+          if data:
+            self.stdin.write(data)
+          else:
+            self.stdin.close()
+            break
+      finally:
+        queue.put('stdin')
+
+    def _queue_pipe_read(pipe, name):
+      """Queues characters read from a pipe into a queue."""
+      try:
+        while True:
+          data = pipe.read(1)
+          if not data:
+            break
+          queue.put((name, data))
+      finally:
+        queue.put(name)
+
+    def timeout_fn():
+      try:
+        done.wait(self.timeout)
+      finally:
+        queue.put('timeout')
+
+    def wait_fn():
+      try:
+        self.wait()
+      finally:
+        queue.put('wait')
+
+    # Starts up to 5 threads:
+    # Wait for the process to quit
+    # Read stdout
+    # Read stderr
+    # Write stdin
+    # Timeout
+    threads = {
+        'wait': threading.Thread(target=wait_fn),
+    }
+    if self.timeout is not None:
+      threads['timeout'] = threading.Thread(target=timeout_fn)
+    if self.stdout_cb:
+      threads['stdout'] = threading.Thread(
+          target=_queue_pipe_read, args=(self.stdout, 'stdout'))
+    if self.stderr_cb:
+      threads['stderr'] = threading.Thread(
+        target=_queue_pipe_read, args=(self.stderr, 'stderr'))
+    if input:
+      threads['stdin'] = threading.Thread(target=write_stdin)
+    for t in threads.itervalues():
+      t.start()
+
+    timed_out = False
+    try:
+      # This thread needs to be optimized for speed.
+      while threads:
+        item = queue.get()
+        if item[0] is 'stdout':
+          self.stdout_cb(item[1])
+        elif item[0] is 'stderr':
+          self.stderr_cb(item[1])
+        else:
+          # A thread terminated.
+          threads[item].join()
+          del threads[item]
+          if item == 'wait':
+            # Terminate the timeout thread if necessary.
+            done.set()
+          elif item == 'timeout' and not timed_out and self.poll() is None:
+            logging.debug('Timed out after %fs: killing' % self.timeout)
+            self.kill()
+            timed_out = True
+    finally:
+      # Stop the threads.
+      done.set()
+      if 'wait' in threads:
+        # Accelerate things, otherwise it would hang until the child process is
+        # done.
+        logging.debug('Killing child because of an exception')
+        self.kill()
+      # Join threads.
+      for thread in threads.itervalues():
+        thread.join()
+      if timed_out:
+        self.returncode = TIMED_OUT
+
+  def communicate(self, input=None, timeout=None): # pylint: disable=W0221,W0622
+    """Adds timeout and callbacks support.
+
+    Returns (stdout, stderr) like subprocess.Popen().communicate().
+
+    - The process will be killed after |timeout| seconds and returncode set to
+      TIMED_OUT.
+    """
+    self.timeout = timeout
+    if not self.timeout and not self.stdout_cb and not self.stderr_cb:
+      return super(Popen, self).communicate(input)
+
+    if self.timeout and self.shell:
+      raise TypeError(
+          'Using timeout and shell simultaneously will cause a process leak '
+          'since the shell will be killed instead of the child process.')
+
+    stdout = None
+    stderr = None
+    # Convert to a lambda to workaround python's deadlock.
+    # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
+    # When the pipe fills up, it will deadlock this process. Using a thread
+    # works around that issue. No need for thread safe function since the call
+    # backs are guaranteed to be called from the main thread.
+    if self.stdout and not self.stdout_cb and not self.stdout_void:
+      stdout = cStringIO.StringIO()
+      self.stdout_cb = stdout.write
+    if self.stderr and not self.stderr_cb and not self.stderr_void:
+      stderr = cStringIO.StringIO()
+      self.stderr_cb = stderr.write
+    self._tee_threads(input)
+    if stdout:
+      stdout = stdout.getvalue()
+    if stderr:
+      stderr = stderr.getvalue()
+    return (stdout, stderr)
+
 
 def communicate(args, timeout=None, **kwargs):
   """Wraps subprocess.Popen().communicate() and add timeout support.
@@ -226,39 +383,11 @@
       # set the Popen() parameter accordingly.
       kwargs['stdin'] = PIPE
 
-  if not timeout:
-    # Normal workflow.
-    proc = Popen(args, **kwargs)
-    if stdin is not None:
-      return proc.communicate(stdin), proc.returncode
-    else:
-      return proc.communicate(), proc.returncode
-
-  # Create a temporary file to workaround python's deadlock.
-  # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
-  # When the pipe fills up, it will deadlock this process. Using a real file
-  # works around that issue.
-  with tempfile.TemporaryFile() as buff:
-    kwargs['stdout'] = buff
-    proc = Popen(args, **kwargs)
-    if proc.shell:
-      raise TypeError(
-          'Using timeout and shell simultaneously will cause a process leak '
-          'since the shell will be killed instead of the child process.')
-    if stdin is not None:
-      proc.stdin.write(stdin)
-    while proc.returncode is None:
-      proc.poll()
-      if timeout and (time.time() - proc.start) > timeout:
-        proc.kill()
-        proc.wait()
-        # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
-        proc.returncode = TIMED_OUT
-      time.sleep(0.001)
-    # Now that the process died, reset the cursor and read the file.
-    buff.seek(0)
-    out = (buff.read(), None)
-  return out, proc.returncode
+  proc = Popen(args, **kwargs)
+  if stdin not in (None, VOID):
+    return proc.communicate(stdin, timeout), proc.returncode
+  else:
+    return proc.communicate(None, timeout), proc.returncode
 
 
 def call(args, **kwargs):