git cl status: use smarter parallel processing.

Previously, 1 CL was queried first, followed by all remaining ones
in parallel. The purpose was to ensure that valid authentication
token is present. However, it still required one call to Rietveld
or Gerrit, needlessly increasing latency.

This CL first loops over all CLs and ensures that credentials are
present, refreshing refresh tokens for Rietveld if necessary. Then,
all CLs are queried in parallel.

R=sergiyb@chromium.org,clemensh@chromium.org
BUG=681704

Change-Id: Ic125ac7c2a684d6f3c34e4e8b899192abbed01bb
Reviewed-on: https://chromium-review.googlesource.com/431033
Reviewed-by: Sergiy Byelozyorov <sergiyb@chromium.org>
Commit-Queue: Andrii Shyshkalov <tandrii@chromium.org>
diff --git a/git_cl.py b/git_cl.py
index 3b92b90..d86902c 100755
--- a/git_cl.py
+++ b/git_cl.py
@@ -1799,11 +1799,13 @@
     failed."""
     raise NotImplementedError()
 
-  def EnsureAuthenticated(self, force):
+  def EnsureAuthenticated(self, force, refresh=False):
     """Best effort check that user is authenticated with codereview server.
 
     Arguments:
       force: whether to skip confirmation questions.
+      refresh: whether to attempt to refresh credentials. Ignored if not
+        applicable.
     """
     raise NotImplementedError()
 
@@ -1852,13 +1854,15 @@
         self._rietveld_server = settings.GetDefaultServerUrl()
     return self._rietveld_server
 
-  def EnsureAuthenticated(self, force):
+  def EnsureAuthenticated(self, force, refresh=False):
     """Best effort check that user is authenticated with Rietveld server."""
     if self._auth_config.use_oauth2:
       authenticator = auth.get_authenticator_for_host(
           self.GetCodereviewServer(), self._auth_config)
       if not authenticator.has_cached_credentials():
         raise auth.LoginRequiredError(self.GetCodereviewServer())
+      if refresh:
+        authenticator.get_access_token()
 
   def FetchDescription(self):
     issue = self.GetIssue()
@@ -2315,7 +2319,7 @@
   def CodereviewServerConfigKey(cls):
     return 'gerritserver'
 
-  def EnsureAuthenticated(self, force):
+  def EnsureAuthenticated(self, force, refresh=None):
     """Best effort check that user is authenticated with Gerrit server."""
     if settings.GetGerritSkipEnsureAuthenticated():
       # For projects with unusual authentication schemes.
@@ -3469,48 +3473,51 @@
   # Silence upload.py otherwise it becomes unwieldy.
   upload.verbosity = 0
 
-  if fine_grained:
-    # Process one branch synchronously to work through authentication, then
-    # spawn processes to process all the other branches in parallel.
-    if changes:
-      def fetch(cl):
-        try:
-          return (cl, cl.GetStatus())
-        except:
-          # See http://crbug.com/629863.
-          logging.exception('failed to fetch status for %s:', cl)
-          raise
-      yield fetch(changes[0])
+  if not changes:
+    raise StopIteration()
 
-      changes_to_fetch = changes[1:]
-      if not changes_to_fetch:
-        # Exit early if there was only one branch to fetch.
-        return
-
-      pool = ThreadPool(
-          min(max_processes, len(changes_to_fetch))
-              if max_processes is not None
-              else max(len(changes_to_fetch), 1))
-
-      fetched_cls = set()
-      it = pool.imap_unordered(fetch, changes_to_fetch).__iter__()
-      while True:
-        try:
-          row = it.next(timeout=5)
-        except multiprocessing.TimeoutError:
-          break
-
-        fetched_cls.add(row[0])
-        yield row
-
-      # Add any branches that failed to fetch.
-      for cl in set(changes_to_fetch) - fetched_cls:
-        yield (cl, 'error')
-
-  else:
+  if not fine_grained:
+    # Fast path which doesn't involve querying codereview servers.
     # Do not use GetApprovingReviewers(), since it requires an HTTP request.
     for cl in changes:
       yield (cl, 'waiting' if cl.GetIssueURL() else 'error')
+    return
+
+  # First, sort out authentication issues.
+  logging.debug('ensuring credentials exist')
+  for cl in changes:
+    cl.EnsureAuthenticated(force=False, refresh=True)
+
+  def fetch(cl):
+    try:
+      return (cl, cl.GetStatus())
+    except:
+      # See http://crbug.com/629863.
+      logging.exception('failed to fetch status for %s:', cl)
+      raise
+
+  threads_count = len(changes)
+  if max_processes:
+    threads_count = max(1, min(threads_count, max_processes))
+  logging.debug('querying %d CLs using %d threads', len(changes), threads_count)
+
+  pool = ThreadPool(threads_count)
+  fetched_cls = set()
+  try:
+    it = pool.imap_unordered(fetch, changes).__iter__()
+    while True:
+      try:
+        cl, status = it.next(timeout=5)
+      except multiprocessing.TimeoutError:
+        break
+      fetched_cls.add(cl)
+      yield cl, status
+  finally:
+    pool.close()
+
+  # Add any branches that failed to fetch.
+  for cl in set(changes) - fetched_cls:
+    yield (cl, 'error')
 
 
 def upload_branch_deps(cl, args):