grep: add --jobs support

Use multiprocessing to run in parallel.  When operating on multiple
projects, this can greatly speed things up.  Across 1000 repos, it
goes from ~40sec to ~16sec with the default -j8.

The output processing does not appear to be a significant bottle
neck -- it accounts for <1sec out of the ~16sec runtime.  Thus we
leave it in the main thread to simplify the code.

Change-Id: I750b72c7711b0c5d26e65d480738fbaac3a69971
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297984
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
diff --git a/subcmds/grep.py b/subcmds/grep.py
index c16d418..49feaf6 100644
--- a/subcmds/grep.py
+++ b/subcmds/grep.py
@@ -12,10 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import functools
+import multiprocessing
 import sys
 
 from color import Coloring
-from command import PagedCommand
+from command import DEFAULT_LOCAL_JOBS, PagedCommand, WORKER_BATCH_SIZE
 from error import GitError
 from git_command import GitCommand
 
@@ -61,6 +63,7 @@
   repo grep --all-match -e NODE -e Unexpected
 
 """
+  PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
 
   @staticmethod
   def _carry_option(_option, opt_str, value, parser):
@@ -80,6 +83,7 @@
       pt.append(value)
 
   def _Options(self, p):
+    super()._Options(p)
     g = p.add_option_group('Sources')
     g.add_option('--cached',
                  action='callback', callback=self._carry_option,
@@ -152,6 +156,72 @@
                  action='callback', callback=self._carry_option,
                  help='Show only file names not containing matching lines')
 
+  def _ExecuteOne(self, cmd_argv, project):
+    """Process one project."""
+    try:
+      p = GitCommand(project,
+                     cmd_argv,
+                     bare=False,
+                     capture_stdout=True,
+                     capture_stderr=True)
+    except GitError as e:
+      return (project, -1, None, str(e))
+
+    return (project, p.Wait(), p.stdout, p.stderr)
+
+  @staticmethod
+  def _ProcessResults(out, full_name, have_rev, results):
+    git_failed = False
+    bad_rev = False
+    have_match = False
+
+    for project, rc, stdout, stderr in results:
+      if rc < 0:
+        git_failed = True
+        out.project('--- project %s ---' % project.relpath)
+        out.nl()
+        out.fail('%s', stderr)
+        out.nl()
+        continue
+
+      if rc:
+        # no results
+        if stderr:
+          if have_rev and 'fatal: ambiguous argument' in stderr:
+            bad_rev = True
+          else:
+            out.project('--- project %s ---' % project.relpath)
+            out.nl()
+            out.fail('%s', stderr.strip())
+            out.nl()
+        continue
+      have_match = True
+
+      # We cut the last element, to avoid a blank line.
+      r = stdout.split('\n')
+      r = r[0:-1]
+
+      if have_rev and full_name:
+        for line in r:
+          rev, line = line.split(':', 1)
+          out.write("%s", rev)
+          out.write(':')
+          out.project(project.relpath)
+          out.write('/')
+          out.write("%s", line)
+          out.nl()
+      elif full_name:
+        for line in r:
+          out.project(project.relpath)
+          out.write('/')
+          out.write("%s", line)
+          out.nl()
+      else:
+        for line in r:
+          print(line)
+
+    return (git_failed, bad_rev, have_match)
+
   def Execute(self, opt, args):
     out = GrepColoring(self.manifest.manifestProject.config)
 
@@ -183,62 +253,18 @@
       cmd_argv.extend(opt.revision)
     cmd_argv.append('--')
 
-    git_failed = False
-    bad_rev = False
-    have_match = False
-
-    for project in projects:
-      try:
-        p = GitCommand(project,
-                       cmd_argv,
-                       bare=False,
-                       capture_stdout=True,
-                       capture_stderr=True)
-      except GitError as e:
-        git_failed = True
-        out.project('--- project %s ---' % project.relpath)
-        out.nl()
-        out.fail('%s', str(e))
-        out.nl()
-        continue
-
-      if p.Wait() != 0:
-        # no results
-        #
-        if p.stderr:
-          if have_rev and 'fatal: ambiguous argument' in p.stderr:
-            bad_rev = True
-          else:
-            out.project('--- project %s ---' % project.relpath)
-            out.nl()
-            out.fail('%s', p.stderr.strip())
-            out.nl()
-        continue
-      have_match = True
-
-      # We cut the last element, to avoid a blank line.
-      #
-      r = p.stdout.split('\n')
-      r = r[0:-1]
-
-      if have_rev and full_name:
-        for line in r:
-          rev, line = line.split(':', 1)
-          out.write("%s", rev)
-          out.write(':')
-          out.project(project.relpath)
-          out.write('/')
-          out.write("%s", line)
-          out.nl()
-      elif full_name:
-        for line in r:
-          out.project(project.relpath)
-          out.write('/')
-          out.write("%s", line)
-          out.nl()
-      else:
-        for line in r:
-          print(line)
+    process_results = functools.partial(
+        self._ProcessResults, out, full_name, have_rev)
+    # NB: Multiprocessing is heavy, so don't spin it up for one job.
+    if len(projects) == 1 or opt.jobs == 1:
+      git_failed, bad_rev, have_match = process_results(
+          self._ExecuteOne(cmd_argv, x) for x in projects)
+    else:
+      with multiprocessing.Pool(opt.jobs) as pool:
+        results = pool.imap(
+            functools.partial(self._ExecuteOne, cmd_argv), projects,
+            chunksize=WORKER_BATCH_SIZE)
+        git_failed, bad_rev, have_match = process_results(results)
 
     if git_failed:
       sys.exit(1)