Directly hook sys.stdout for thread annotated output.

In the next change, we can now remove all the options.stdout bookeeping since
it's not unnecessary.

TEST=none
BUG=none

Review URL: http://codereview.chromium.org/3398008

git-svn-id: svn://svn.chromium.org/chrome/trunk/tools/depot_tools@59795 0039d316-1c4b-4281-b951-d872f2087c98
diff --git a/gclient_utils.py b/gclient_utils.py
index aeaebc2..924f0a5 100644
--- a/gclient_utils.py
+++ b/gclient_utils.py
@@ -297,8 +297,10 @@
 
 def SoftClone(obj):
   """Clones an object. copy.copy() doesn't work on 'file' objects."""
-  class NewObject(object): pass
-  new_obj = NewObject()
+  if obj.__class__.__name__ == 'SoftCloned':
+    return obj
+  class SoftCloned(object): pass
+  new_obj = SoftCloned()
   for member in dir(obj):
     if member.startswith('_'):
       continue
@@ -314,10 +316,11 @@
     return fileobj
 
   new_fileobj = SoftClone(fileobj)
-  new_fileobj.lock = threading.Lock()
+  if not hasattr(new_fileobj, 'lock'):
+    new_fileobj.lock = threading.Lock()
   new_fileobj.last_flushed_at = time.time()
   new_fileobj.delay = delay
-  new_fileobj.old_auto_flush_write = fileobj.write
+  new_fileobj.old_auto_flush_write = new_fileobj.write
   # Silence pylint.
   new_fileobj.flush = fileobj.flush
 
@@ -339,27 +342,68 @@
   return new_fileobj
 
 
-class StdoutAnnotated(object):
-  """Prepends every line with a string."""
-  def __init__(self, prepend, stdout):
-    self.prepend = prepend
-    self.buf = ''
-    self.stdout = stdout
+def MakeFileAnnotated(fileobj):
+  """Creates a file object clone to automatically prepends every line in worker
+  threads with a NN> prefix."""
+  if hasattr(fileobj, 'output_buffers'):
+    # Already patched.
+    return fileobj
 
-  def write(self, out):
-    self.buf += out
-    while '\n' in self.buf:
-      line, self.buf = self.buf.split('\n', 1)
-      self.stdout.write(self.prepend + line + '\n')
+  new_fileobj = SoftClone(fileobj)
+  if not hasattr(new_fileobj, 'lock'):
+    new_fileobj.lock = threading.Lock()
+  new_fileobj.output_buffers = {}
+  new_fileobj.old_annotated_write = new_fileobj.write
 
-  def flush(self):
-    pass
+  def annotated_write(out):
+    index = getattr(threading.currentThread(), 'index', None)
+    if index is None:
+      # Undexed threads aren't buffered.
+      new_fileobj.old_annotated_write(out)
+      return
 
-  def full_flush(self):
-    if self.buf:
-      self.stdout.write(self.prepend + self.buf)
-      self.stdout.flush()
-      self.buf = ''
+    new_fileobj.lock.acquire()
+    try:
+      # Use a dummy array to hold the string so the code can be lockless.
+      # Strings are immutable, requiring to keep a lock for the whole dictionary
+      # otherwise. Using an array is faster than using a dummy object.
+      if not index in new_fileobj.output_buffers:
+        obj = new_fileobj.output_buffers[index] = ['']
+      else:
+        obj = new_fileobj.output_buffers[index]
+    finally:
+      new_fileobj.lock.release()
+
+    # Continue lockless.
+    obj[0] += out
+    while '\n' in obj[0]:
+      line, remaining = obj[0].split('\n', 1)
+      new_fileobj.old_annotated_write('%d>%s\n' % (index, line))
+      obj[0] = remaining
+
+  def full_flush():
+    """Flush buffered output."""
+    orphans = []
+    new_fileobj.lock.acquire()
+    try:
+      # Detect threads no longer existing.
+      indexes = (getattr(t, 'index', None) for t in threading.enumerate())
+      indexed = filter(None, indexes)
+      for index in new_fileobj.output_buffers:
+        if not index in indexes:
+          orphans.append((index, new_fileobj.output_buffers[index][0]))
+      for orphan in orphans:
+        del new_fileobj.output_buffers[orphan[0]]
+    finally:
+      new_fileobj.lock.release()
+
+    # Don't keep the lock while writting. Will append \n when it shouldn't.
+    for orphan in orphans:
+      new_fileobj.old_annotated_write('%d>%s\n' % (orphan[0], orphan[1]))
+
+  new_fileobj.write = annotated_write
+  new_fileobj.full_flush = full_flush
+  return new_fileobj
 
 
 def CheckCallAndFilter(args, stdout=None, filter_fn=None,
@@ -628,12 +672,10 @@
     if self.jobs > 1:
       # Start the thread.
       index = len(self.ran) + len(self.running) + 1
-      # Copy 'options' and add annotated stdout.
+      # Copy 'options'.
       task_kwargs = kwargs.copy()
       task_kwargs['options'] = copy.copy(task_kwargs['options'])
-      task_kwargs['options'].stdout = StdoutAnnotated(
-          '%d>' % index, task_kwargs['options'].stdout)
-      new_thread = self._Worker(task_item, args, task_kwargs)
+      new_thread = self._Worker(task_item, index, args, task_kwargs)
       self.running.append(new_thread)
       new_thread.start()
     else:
@@ -646,10 +688,11 @@
 
   class _Worker(threading.Thread):
     """One thread to execute one WorkItem."""
-    def __init__(self, item, args, kwargs):
+    def __init__(self, item, index, args, kwargs):
       threading.Thread.__init__(self, name=item.name or 'Worker')
       logging.info(item.name)
       self.item = item
+      self.index = index
       self.args = args
       self.kwargs = kwargs