Add callback support for stdout and stderr.

It's currently an inefficient thread implementation. Interestingly
enough, callback support is significantly faster on cygwin than on
native python.

Writing an efficient implementation is punted for a later change,
one per implementation.

Stops using a temporary file since it's not necessary anymore.

The goal is to reduce the number of places where a similar paradigm
is used by having a canonical generic implementation.

R=dpranke@chromium.org
BUG=
TEST=Tested manually on Windows, cygwin, linux, OSX 10.6


Review URL: http://codereview.chromium.org/8374026

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@109239 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/subprocess2.py b/subprocess2.py
index 4708aad..810631a 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -8,12 +8,13 @@
 """
 
 from __future__ import with_statement
+import cStringIO
 import errno
 import logging
 import os
+import Queue
 import subprocess
 import sys
-import tempfile
 import time
 import threading
 
@@ -176,6 +177,9 @@
       # When the pipe fills up, it will deadlock this process. Using a real file
       # works around that issue.
       kwargs[stream] = open(os.devnull, 'w')
+    if callable(kwargs.get(stream)):
+      # Callable stdout/stderr should be used only with call() wrappers.
+      kwargs[stream] = PIPE
 
   fix('stdout')
   fix('stderr')
@@ -198,6 +202,140 @@
     raise
 
 
+def _queue_pipe_read(pipe, name, done, dest):
+  """Queue characters read from a pipe into a queue.
+
+  Left outside the _tee_threads function to not introduce a function closure
+  to speed up variable lookup.
+  """
+  while not done.isSet():
+    data = pipe.read(1)
+    if not data:
+      break
+    dest.put((name, data))
+  dest.put(name)
+
+
+def _tee_threads(proc, timeout, start, stdin, args, kwargs):
+  """Does I/O for a process's pipes using thread.
+
+  It's the simplest and slowest implementation. Expect very slow behavior.
+
+  If there is a callback and it doesn't keep up with the calls, the timeout
+  effectiveness will be delayed accordingly.
+  """
+  # TODO(maruel): Implement a select based implementation on POSIX and a Windows
+  # one using WaitForMultipleObjects().
+  #
+  # Queue of either of <threadname> when done or (<threadname>, data).
+  # In theory we would like to limit to ~64kb items to not cause large memory
+  # usage when the callback blocks. It is not done because it slows down
+  # processing on OSX10.6 by a factor of 2x, making it even slower than Windows!
+  # Revisit this decision if it becomes a problem, e.g. crash because of
+  # memory exhaustion.
+  queue = Queue.Queue()
+  done = threading.Event()
+
+  def write_stdin():
+    stdin_io = cStringIO.StringIO(stdin)
+    while not done.isSet():
+      data = stdin_io.read(1024)
+      if data:
+        proc.stdin.write(data)
+      else:
+        proc.stdin.close()
+        break
+    queue.put('stdin')
+
+  def timeout_fn():
+    done.wait(timeout)
+    # No need to close the pipes since killing should be sufficient.
+    queue.put('timeout')
+
+  # Starts up to 4 threads:
+  # Read stdout
+  # Read stderr
+  # Write stdin
+  # Timeout
+  threads = {}
+  if timeout is not None:
+    threads['timeout'] = threading.Thread(target=timeout_fn)
+  if callable(kwargs.get('stdout')):
+    threads['stdout'] = threading.Thread(
+      target=_queue_pipe_read, args=(proc.stdout, 'stdout', done, queue))
+  if callable(kwargs.get('stderr')):
+    threads['stderr'] = threading.Thread(
+      target=_queue_pipe_read,
+      args=(proc.stderr, 'stderr', done, queue))
+  if isinstance(stdin, str):
+    threads['stdin'] = threading.Thread(target=write_stdin)
+  for t in threads.itervalues():
+    t.daemon = True
+    t.start()
+
+  timed_out = False
+  try:
+    while proc.returncode is None:
+      assert threads
+      proc.poll()
+      item = queue.get()
+      if isinstance(item, str):
+        threads[item].join()
+        del threads[item]
+        if item == 'timeout' and not timed_out and proc.poll() is None:
+          logging.debug('Timed out: killing')
+          proc.kill()
+          timed_out = True
+        if not threads:
+          # We won't be waken up anymore. Need to busy loop.
+          break
+      else:
+        kwargs[item[0]](item[1])
+  finally:
+    # Stop the threads.
+    done.set()
+    # Join threads
+    for thread in threads.itervalues():
+      thread.join()
+
+  # Flush the queue.
+  try:
+    while True:
+      item = queue.get(False)
+      if isinstance(item, str):
+        if item == 'timeout':
+          # TODO(maruel): Does it make sense at that point?
+          if not timed_out and proc.poll() is None:
+            logging.debug('Timed out: killing')
+            proc.kill()
+            timed_out = True
+      else:
+        kwargs[item[0]](item[1])
+  except Queue.Empty:
+    pass
+
+  # Get the remainder.
+  if callable(kwargs.get('stdout')):
+    data = proc.stdout.read()
+    while data:
+      kwargs['stdout'](data)
+      data = proc.stdout.read()
+  if callable(kwargs.get('stderr')):
+    data = proc.stderr.read()
+    while data:
+      kwargs['stderr'](data)
+      data = proc.stderr.read()
+
+  if proc.returncode is None:
+    # Usually happens when killed with timeout but not listening to pipes.
+    proc.wait()
+
+  if timed_out:
+    return TIMED_OUT
+
+  return proc.returncode
+
+
 def communicate(args, timeout=None, **kwargs):
   """Wraps subprocess.Popen().communicate().
 
@@ -207,6 +345,11 @@
     TIMED_OUT.
   - Automatically passes stdin content as input so do not specify stdin=PIPE.
   """
+  if timeout and kwargs.get('shell'):
+    raise TypeError(
+        'Using timeout and shell simultaneously will cause a process leak '
+        'since the shell will be killed instead of the child process.')
+
   stdin = kwargs.pop('stdin', None)
   if stdin is not None:
     if stdin is VOID:
@@ -218,36 +361,37 @@
       # set the Popen() parameter accordingly.
       kwargs['stdin'] = PIPE
 
-  if not timeout:
+  start = time.time()
+  proc = Popen(args, **kwargs)
+  need_buffering = (timeout or
+      callable(kwargs.get('stdout')) or callable(kwargs.get('stderr')))
+
+  if not need_buffering:
     # Normal workflow.
-    proc = Popen(args, **kwargs)
-    if stdin is not None:
+    if stdin not in (None, VOID):
       return proc.communicate(stdin), proc.returncode
     else:
       return proc.communicate(), proc.returncode
 
-  # Create a temporary file to workaround python's deadlock.
+  stdout = None
+  stderr = None
+  # Convert to a lambda to workaround python's deadlock.
   # http://docs.python.org/library/subprocess.html#subprocess.Popen.wait
-  # When the pipe fills up, it will deadlock this process. Using a real file
-  # works around that issue.
-  with tempfile.TemporaryFile() as buff:
-    start = time.time()
-    kwargs['stdout'] = buff
-    proc = Popen(args, **kwargs)
-    if stdin is not None:
-      proc.stdin.write(stdin)
-    while proc.returncode is None:
-      proc.poll()
-      if timeout and (time.time() - start) > timeout:
-        proc.kill()
-        proc.wait()
-        # It's -9 on linux and 1 on Windows. Standardize to TIMED_OUT.
-        proc.returncode = TIMED_OUT
-      time.sleep(0.001)
-    # Now that the process died, reset the cursor and read the file.
-    buff.seek(0)
-    out = [buff.read(), None]
-  return out, proc.returncode
+  # When the pipe fills up, it will deadlock this process. Using a thread
+  # works around that issue. No need for thread safe function since the call
+  # backs are guaranteed to be called from the main thread.
+  if kwargs.get('stdout') == PIPE:
+    stdout = []
+    kwargs['stdout'] = stdout.append
+  if kwargs.get('stderr') == PIPE:
+    stderr = []
+    kwargs['stderr'] = stderr.append
+  returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+  if not stdout is None:
+    stdout = ''.join(stdout)
+  if not stderr is None:
+    stderr = ''.join(stderr)
+  return (stdout, stderr), returncode
 
 
 def call(args, **kwargs):