command: add a helper for the parallel execution boilerplate

Now that we have a bunch of subcommands doing parallel execution, a
common pattern arises that we can factor out for most of them.  We
leave forall alone as it's a bit too complicated atm to cut over.

Change-Id: I3617a4f7c66142bcd1ab030cb4cca698a65010ac
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/301942
Tested-by: Mike Frysinger <vapier@google.com>
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
diff --git a/subcmds/diff.py b/subcmds/diff.py
index cdc262e..4966bb1 100644
--- a/subcmds/diff.py
+++ b/subcmds/diff.py
@@ -14,9 +14,8 @@
 
 import functools
 import io
-import multiprocessing
 
-from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE
+from command import DEFAULT_LOCAL_JOBS, PagedCommand
 
 
 class Diff(PagedCommand):
@@ -36,7 +35,7 @@
                  dest='absolute', action='store_true',
                  help='Paths are relative to the repository root')
 
-  def _DiffHelper(self, absolute, project):
+  def _ExecuteOne(self, absolute, project):
     """Obtains the diff for a specific project.
 
     Args:
@@ -51,22 +50,20 @@
     return (ret, buf.getvalue())
 
   def Execute(self, opt, args):
-    ret = 0
     all_projects = self.GetProjects(args)
 
-    # NB: Multiprocessing is heavy, so don't spin it up for one job.
-    if len(all_projects) == 1 or opt.jobs == 1:
-      for project in all_projects:
-        if not project.PrintWorkTreeDiff(opt.absolute):
+    def _ProcessResults(_pool, _output, results):
+      ret = 0
+      for (state, output) in results:
+        if output:
+          print(output, end='')
+        if not state:
           ret = 1
-    else:
-      with multiprocessing.Pool(opt.jobs) as pool:
-        states = pool.imap(functools.partial(self._DiffHelper, opt.absolute),
-                           all_projects, WORKER_BATCH_SIZE)
-        for (state, output) in states:
-          if output:
-            print(output, end='')
-          if not state:
-            ret = 1
+      return ret
 
-    return ret
+    return self.ExecuteInParallel(
+        opt.jobs,
+        functools.partial(self._ExecuteOne, opt.absolute),
+        all_projects,
+        callback=_ProcessResults,
+        ordered=True)