Implement accelerated tee support for POSIX.

This removes all the need of threading, which removes the contention on the GIL
lock.

Taking S2Test.test_check_output_tee_large as baseline, numbers are
real/user/sys in seconds:
Ubuntu workstation: ~25x  2.4/1.9/1.5 -> 0.10/0.70/0.02
OSX 10.6 laptop:    ~40x  6.4/5.3/3.9 -> 0.15/0.80/0.07
Cygwin on win7:      ~4x  2.8/2.2/1.3 -> 0.60/0.16/0.30

R=dpranke@chromium.org
BUG=
TEST=


Review URL: http://codereview.chromium.org/8462008

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@109283 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/subprocess2.py b/subprocess2.py
index 810631a..430010c 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -13,11 +13,16 @@
 import logging
 import os
 import Queue
+import select
 import subprocess
 import sys
 import time
 import threading
 
+if sys.platform != 'win32':
+  import fcntl
+
+
 # Constants forwarded from subprocess.
 PIPE = subprocess.PIPE
 STDOUT = subprocess.STDOUT
@@ -203,7 +208,7 @@
 
 
 def _queue_pipe_read(pipe, name, done, dest):
-  """Queue characters read from a pipe into a queue.
+  """Queues characters read from a pipe into a queue.
 
   Left outside the _tee_threads function to not introduce a function closure
   to speed up variable lookup.
@@ -336,6 +341,80 @@
   return proc.returncode
 
 
+def _read_pipe(handles, pipe, out_fn):
+  """Reads bytes from a pipe and calls the output callback."""
+  data = pipe.read()
+  if not data:
+    del handles[pipe]
+  else:
+    out_fn(data)
+
+
+def _tee_posix(proc, timeout, start, stdin, args, kwargs):
+  """Polls a process and its pipe using select.select().
+
+  TODO(maruel): Implement a non-polling method for OSes that support it.
+  """
+  handles_r = {}
+  if callable(kwargs.get('stdout')):
+    handles_r[proc.stdout] = lambda: _read_pipe(
+        handles_r, proc.stdout, kwargs['stdout'])
+  if callable(kwargs.get('stderr')):
+    handles_r[proc.stderr] = lambda: _read_pipe(
+        handles_r, proc.stderr, kwargs['stderr'])
+
+  handles_w = {}
+  if isinstance(stdin, str):
+    stdin_io = cStringIO.StringIO(stdin)
+    def write_stdin():
+      data = stdin_io.read(1)
+      if data:
+        proc.stdin.write(data)
+      else:
+        del handles_w[proc.stdin]
+        proc.stdin.close()
+    handles_w[proc.stdin] = write_stdin
+  else:
+    # TODO(maruel): Fix me, it could be VOID.
+    assert stdin is None
+
+  # Make all the file objects of the child process non-blocking file.
+  # TODO(maruel): Test if a pipe is handed to the child process.
+  for pipe in (proc.stdin, proc.stdout, proc.stderr):
+    fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
+    if fileno:
+      # Note: making a pipe non-blocking means the C stdio could act wrong. In
+      # particular, readline() cannot be used. Work around is to use os.read().
+      fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
+      fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+
+  timed_out = False
+  while handles_r or handles_w or (timeout and proc.poll() is None):
+    period = None
+    if timeout:
+      period = max(0, timeout - (time.time() - start))
+      if not period and not timed_out:
+        proc.kill()
+        timed_out = True
+    if timed_out:
+      period = 0.001
+
+    # It reconstructs objects on each loop, not very efficient.
+    reads, writes, _, = select.select(
+        handles_r.keys(), handles_w.keys(), [], period)
+    for read in reads:
+      handles_r[read]()
+    for write in writes:
+      handles_w[write]()
+
+  # No pipe open anymore and if there was a time out, the child process was
+  # killed already.
+  proc.wait()
+  if timed_out:
+    return TIMED_OUT
+  return proc.returncode
+
+
 def communicate(args, timeout=None, **kwargs):
   """Wraps subprocess.Popen().communicate().
 
@@ -386,7 +465,12 @@
   if kwargs.get('stderr') == PIPE:
     stderr = []
     kwargs['stderr'] = stderr.append
-  returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+  if sys.platform == 'win32':
+    # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE,
+    # doesn't exist so _tee_win() cannot be used yet.
+    returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+  else:
+    returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs)
   if not stdout is None:
     stdout = ''.join(stdout)
   if not stderr is None: