Directly hook sys.stdout for thread annotated output.
In the next change, we can now remove all the options.stdout bookeeping since
it's not unnecessary.
TEST=none
BUG=none
Review URL: http://codereview.chromium.org/3398008
git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@59795 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/gclient_utils.py b/gclient_utils.py
index aeaebc2..924f0a5 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -297,8 +297,10 @@
def SoftClone(obj):
"""Clones an object. copy.copy() doesn't work on 'file' objects."""
- class NewObject(object): pass
- new_obj = NewObject()
+ if obj.__class__.__name__ == 'SoftCloned':
+ return obj
+ class SoftCloned(object): pass
+ new_obj = SoftCloned()
for member in dir(obj):
if member.startswith('_'):
continue
@@ -314,10 +316,11 @@
return fileobj
new_fileobj = SoftClone(fileobj)
- new_fileobj.lock = threading.Lock()
+ if not hasattr(new_fileobj, 'lock'):
+ new_fileobj.lock = threading.Lock()
new_fileobj.last_flushed_at = time.time()
new_fileobj.delay = delay
- new_fileobj.old_auto_flush_write = fileobj.write
+ new_fileobj.old_auto_flush_write = new_fileobj.write
# Silence pylint.
new_fileobj.flush = fileobj.flush
@@ -339,27 +342,68 @@
return new_fileobj
-class StdoutAnnotated(object):
- """Prepends every line with a string."""
- def __init__(self, prepend, stdout):
- self.prepend = prepend
- self.buf = ''
- self.stdout = stdout
+def MakeFileAnnotated(fileobj):
+ """Creates a file object clone to automatically prepends every line in worker
+ threads with a NN> prefix."""
+ if hasattr(fileobj, 'output_buffers'):
+ # Already patched.
+ return fileobj
- def write(self, out):
- self.buf += out
- while '\n' in self.buf:
- line, self.buf = self.buf.split('\n', 1)
- self.stdout.write(self.prepend + line + '\n')
+ new_fileobj = SoftClone(fileobj)
+ if not hasattr(new_fileobj, 'lock'):
+ new_fileobj.lock = threading.Lock()
+ new_fileobj.output_buffers = {}
+ new_fileobj.old_annotated_write = new_fileobj.write
- def flush(self):
- pass
+ def annotated_write(out):
+ index = getattr(threading.currentThread(), 'index', None)
+ if index is None:
+ # Undexed threads aren't buffered.
+ new_fileobj.old_annotated_write(out)
+ return
- def full_flush(self):
- if self.buf:
- self.stdout.write(self.prepend + self.buf)
- self.stdout.flush()
- self.buf = ''
+ new_fileobj.lock.acquire()
+ try:
+ # Use a dummy array to hold the string so the code can be lockless.
+ # Strings are immutable, requiring to keep a lock for the whole dictionary
+ # otherwise. Using an array is faster than using a dummy object.
+ if not index in new_fileobj.output_buffers:
+ obj = new_fileobj.output_buffers[index] = ['']
+ else:
+ obj = new_fileobj.output_buffers[index]
+ finally:
+ new_fileobj.lock.release()
+
+ # Continue lockless.
+ obj[0] += out
+ while '\n' in obj[0]:
+ line, remaining = obj[0].split('\n', 1)
+ new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
+ obj[0] = remaining
+
+ def full_flush():
+ """Flush buffered output."""
+ orphans = []
+ new_fileobj.lock.acquire()
+ try:
+ # Detect threads no longer existing.
+ indexes = (getattr(t, 'index', None) for t in threading.enumerate())
+ indexed = filter(None, indexes)
+ for index in new_fileobj.output_buffers:
+ if not index in indexes:
+ orphans.append((index, new_fileobj.output_buffers[index][0]))
+ for orphan in orphans:
+ del new_fileobj.output_buffers[orphan[0]]
+ finally:
+ new_fileobj.lock.release()
+
+ # Don't keep the lock while writting. Will append \n when it shouldn't.
+ for orphan in orphans:
+ new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
+
+ new_fileobj.write = annotated_write
+ new_fileobj.full_flush = full_flush
+ return new_fileobj
def CheckCallAndFilter(args, stdout=None, filter_fn=None,
@@ -628,12 +672,10 @@
if self.jobs > 1:
# Start the thread.
index = len(self.ran) + len(self.running) + 1
- # Copy 'options' and add annotated stdout.
+ # Copy 'options'.
task_kwargs = kwargs.copy()
task_kwargs['options'] = copy.copy(task_kwargs['options'])
- task_kwargs['options'].stdout = StdoutAnnotated(
- '%d>' % index, task_kwargs['options'].stdout)
- new_thread = self._Worker(task_item, args, task_kwargs)
+ new_thread = self._Worker(task_item, index, args, task_kwargs)
self.running.append(new_thread)
new_thread.start()
else:
@@ -646,10 +688,11 @@
class _Worker(threading.Thread):
"""One thread to execute one WorkItem."""
- def __init__(self, item, args, kwargs):
+ def __init__(self, item, index, args, kwargs):
threading.Thread.__init__(self, name=item.name or 'Worker')
logging.info(item.name)
self.item = item
+ self.index = index
self.args = args
self.kwargs = kwargs