Implement accelerated tee support for POSIX.
This removes all the need of threading, which removes the contention on the GIL
lock.
Taking S2Test.test_check_output_tee_large as baseline, numbers are
real/user/sys in seconds:
Ubuntu workstation: ~25x 2.4/1.9/1.5 -> 0.10/0.70/0.02
OSX 10.6 laptop: ~40x 6.4/5.3/3.9 -> 0.15/0.80/0.07
Cygwin on win7: ~4x 2.8/2.2/1.3 -> 0.60/0.16/0.30
R=dpranke@chromium.org
BUG=
TEST=
Review URL: http://codereview.chromium.org/8462008
git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@109283 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/subprocess2.py b/subprocess2.py
index 810631a..430010c 100644
--- a/subprocess2.py
+++ b/subprocess2.py
@@ -13,11 +13,16 @@
import logging
import os
import Queue
+import select
import subprocess
import sys
import time
import threading
+if sys.platform != 'win32':
+ import fcntl
+
+
# Constants forwarded from subprocess.
PIPE = subprocess.PIPE
STDOUT = subprocess.STDOUT
@@ -203,7 +208,7 @@
def _queue_pipe_read(pipe, name, done, dest):
- """Queue characters read from a pipe into a queue.
+ """Queues characters read from a pipe into a queue.
Left outside the _tee_threads function to not introduce a function closure
to speed up variable lookup.
@@ -336,6 +341,80 @@
return proc.returncode
+def _read_pipe(handles, pipe, out_fn):
+ """Reads bytes from a pipe and calls the output callback."""
+ data = pipe.read()
+ if not data:
+ del handles[pipe]
+ else:
+ out_fn(data)
+
+
+def _tee_posix(proc, timeout, start, stdin, args, kwargs):
+ """Polls a process and its pipe using select.select().
+
+ TODO(maruel): Implement a non-polling method for OSes that support it.
+ """
+ handles_r = {}
+ if callable(kwargs.get('stdout')):
+ handles_r[proc.stdout] = lambda: _read_pipe(
+ handles_r, proc.stdout, kwargs['stdout'])
+ if callable(kwargs.get('stderr')):
+ handles_r[proc.stderr] = lambda: _read_pipe(
+ handles_r, proc.stderr, kwargs['stderr'])
+
+ handles_w = {}
+ if isinstance(stdin, str):
+ stdin_io = cStringIO.StringIO(stdin)
+ def write_stdin():
+ data = stdin_io.read(1)
+ if data:
+ proc.stdin.write(data)
+ else:
+ del handles_w[proc.stdin]
+ proc.stdin.close()
+ handles_w[proc.stdin] = write_stdin
+ else:
+ # TODO(maruel): Fix me, it could be VOID.
+ assert stdin is None
+
+ # Make all the file objects of the child process non-blocking file.
+ # TODO(maruel): Test if a pipe is handed to the child process.
+ for pipe in (proc.stdin, proc.stdout, proc.stderr):
+ fileno = pipe and getattr(pipe, 'fileno', lambda: None)()
+ if fileno:
+ # Note: making a pipe non-blocking means the C stdio could act wrong. In
+ # particular, readline() cannot be used. Work around is to use os.read().
+ fl = fcntl.fcntl(fileno, fcntl.F_GETFL)
+ fcntl.fcntl(fileno, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+
+ timed_out = False
+ while handles_r or handles_w or (timeout and proc.poll() is None):
+ period = None
+ if timeout:
+ period = max(0, timeout - (time.time() - start))
+ if not period and not timed_out:
+ proc.kill()
+ timed_out = True
+ if timed_out:
+ period = 0.001
+
+ # It reconstructs objects on each loop, not very efficient.
+ reads, writes, _, = select.select(
+ handles_r.keys(), handles_w.keys(), [], period)
+ for read in reads:
+ handles_r[read]()
+ for write in writes:
+ handles_w[write]()
+
+ # No pipe open anymore and if there was a time out, the child process was
+ # killed already.
+ proc.wait()
+ if timed_out:
+ return TIMED_OUT
+ return proc.returncode
+
+
def communicate(args, timeout=None, **kwargs):
"""Wraps subprocess.Popen().communicate().
@@ -386,7 +465,12 @@
if kwargs.get('stderr') == PIPE:
stderr = []
kwargs['stderr'] = stderr.append
- returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+ if sys.platform == 'win32':
+ # On cygwin, ctypes._FUNCFLAG_STDCALL, which is used by ctypes.WINFUNCTYPE,
+ # doesn't exist so _tee_win() cannot be used yet.
+ returncode = _tee_threads(proc, timeout, start, stdin, args, kwargs)
+ else:
+ returncode = _tee_posix(proc, timeout, start, stdin, args, kwargs)
if not stdout is None:
stdout = ''.join(stdout)
if not stderr is None: