command: add a helper for the parallel execution boilerplate

Now that we have a bunch of subcommands doing parallel execution, a
common pattern arises that we can factor out for most of them.  We
leave forall alone as it's a bit too complicated atm to cut over.

Change-Id: I3617a4f7c66142bcd1ab030cb4cca698a65010ac
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/301942
Tested-by: Mike Frysinger <vapier@google.com>
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
diff --git a/command.py b/command.py
index be2d6a6..9b1220d 100644
--- a/command.py
+++ b/command.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import multiprocessing
 import os
 import optparse
 import platform
@@ -21,6 +22,7 @@
 from event_log import EventLog
 from error import NoSuchProjectError
 from error import InvalidProjectGroupsError
+import progress
 
 
 # Number of projects to submit to a single worker process at a time.
@@ -156,6 +158,44 @@
     """
     raise NotImplementedError
 
+  @staticmethod
+  def ExecuteInParallel(jobs, func, inputs, callback, output=None, ordered=False):
+    """Helper for managing parallel execution boiler plate.
+
+    For subcommands that can easily split their work up.
+
+    Args:
+      jobs: How many parallel processes to use.
+      func: The function to apply to each of the |inputs|.  Usually a
+          functools.partial for wrapping additional arguments.  It will be run
+          in a separate process, so it must be pickalable, so nested functions
+          won't work.  Methods on the subcommand Command class should work.
+      inputs: The list of items to process.  Must be a list.
+      callback: The function to pass the results to for processing.  It will be
+          executed in the main thread and process the results of |func| as they
+          become available.  Thus it may be a local nested function.  Its return
+          value is passed back directly.  It takes three arguments:
+          - The processing pool (or None with one job).
+          - The |output| argument.
+          - An iterator for the results.
+      output: An output manager.  May be progress.Progess or color.Coloring.
+      ordered: Whether the jobs should be processed in order.
+
+    Returns:
+      The |callback| function's results are returned.
+    """
+    try:
+      # NB: Multiprocessing is heavy, so don't spin it up for one job.
+      if len(inputs) == 1 or jobs == 1:
+        return callback(None, output, (func(x) for x in inputs))
+      else:
+        with multiprocessing.Pool(jobs) as pool:
+          submit = pool.imap if ordered else pool.imap_unordered
+          return callback(pool, output, submit(func, inputs, chunksize=WORKER_BATCH_SIZE))
+    finally:
+      if isinstance(output, progress.Progress):
+        output.end()
+
   def _ResetPathToProjectMap(self, projects):
     self._by_path = dict((p.worktree, p) for p in projects)