subcmds: reduce multiprocessing serialization overhead

Follow the same approach as 39ffd9977e to reduce serialization overhead.

Below benchmarks are tested with 2.7k projects on my workstation
(warm cache). git tracing is disabled for benchmark.

(seconds)              | v2.48 | v2.48 | this CL | this CL
	               |       |  -j32 |         |    -j32
-----------------------------------------------------------
with clean tree state:
branches (none)        |   5.6 |   5.9 |    1.0  |    0.9
status (clean)         |  21.3 |   9.4 |   19.4  |    4.7
diff (none)            |   7.6 |   7.2 |    5.7  |    2.2
prune (none)           |   5.7 |   6.1 |    1.3  |    1.2
abandon (none)         |  19.4 |  18.6 |    0.9  |    0.8
upload (none)          |  19.7 |  18.7 |    0.9  |    0.8
forall -c true         |   7.5 |   7.6 |    0.6  |    0.6
forall -c "git log -1" |  11.3 |  11.1 |    0.6  |    0.6

with branches:
start BRANCH --all     |  21.9 |  20.3 |   13.6  |    2.6
checkout BRANCH        |  29.1 |  27.8 |    1.1  |    1.0
branches (2)           |  28.0 |  28.6 |    1.5  |    1.3
abandon BRANCH         |  29.2 |  27.5 |    9.7  |    2.2

Bug: b/371638995
Change-Id: I53989a3d1e43063587b3f52f852b1c2c56b49412
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/440221
Reviewed-by: Josip Sokcevic <sokcevic@google.com>
Tested-by: Kuang-che Wu <kcwu@google.com>
Commit-Queue: Kuang-che Wu <kcwu@google.com>
diff --git a/command.py b/command.py
index 2a2ce13..22115ac 100644
--- a/command.py
+++ b/command.py
@@ -268,8 +268,10 @@
             cls._parallel_context = None
 
     @classmethod
-    def _SetParallelContext(cls, context):
+    def _InitParallelWorker(cls, context, initializer):
         cls._parallel_context = context
+        if initializer:
+            initializer()
 
     @classmethod
     def ExecuteInParallel(
@@ -281,6 +283,7 @@
         output=None,
         ordered=False,
         chunksize=WORKER_BATCH_SIZE,
+        initializer=None,
     ):
         """Helper for managing parallel execution boiler plate.
 
@@ -307,6 +310,7 @@
             ordered: Whether the jobs should be processed in order.
             chunksize: The number of jobs processed in batch by parallel
                 workers.
+            initializer: Worker initializer.
 
         Returns:
             The |callback| function's results are returned.
@@ -318,8 +322,8 @@
             else:
                 with multiprocessing.Pool(
                     jobs,
-                    initializer=cls._SetParallelContext,
-                    initargs=(cls._parallel_context,),
+                    initializer=cls._InitParallelWorker,
+                    initargs=(cls._parallel_context, initializer),
                 ) as pool:
                     submit = pool.imap if ordered else pool.imap_unordered
                     return callback(
diff --git a/subcmds/abandon.py b/subcmds/abandon.py
index e280d69..3208be6 100644
--- a/subcmds/abandon.py
+++ b/subcmds/abandon.py
@@ -70,8 +70,10 @@
         else:
             args.insert(0, "'All local branches'")
 
-    def _ExecuteOne(self, all_branches, nb, project):
+    @classmethod
+    def _ExecuteOne(cls, all_branches, nb, project_idx):
         """Abandon one project."""
+        project = cls.get_parallel_context()["projects"][project_idx]
         if all_branches:
             branches = project.GetBranches()
         else:
@@ -89,7 +91,7 @@
             if status is not None:
                 ret[name] = status
 
-        return (ret, project, errors)
+        return (ret, project_idx, errors)
 
     def Execute(self, opt, args):
         nb = args[0].split()
@@ -102,7 +104,8 @@
         _RelPath = lambda p: p.RelPath(local=opt.this_manifest_only)
 
         def _ProcessResults(_pool, pm, states):
-            for results, project, errors in states:
+            for results, project_idx, errors in states:
+                project = all_projects[project_idx]
                 for branch, status in results.items():
                     if status:
                         success[branch].append(project)
@@ -111,15 +114,18 @@
                 aggregate_errors.extend(errors)
                 pm.update(msg="")
 
-        self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(self._ExecuteOne, opt.all, nb),
-            all_projects,
-            callback=_ProcessResults,
-            output=Progress(
-                f"Abandon {nb}", len(all_projects), quiet=opt.quiet
-            ),
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = all_projects
+            self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(self._ExecuteOne, opt.all, nb),
+                range(len(all_projects)),
+                callback=_ProcessResults,
+                output=Progress(
+                    f"Abandon {nb}", len(all_projects), quiet=opt.quiet
+                ),
+                chunksize=1,
+            )
 
         width = max(
             itertools.chain(
diff --git a/subcmds/branches.py b/subcmds/branches.py
index 59b5cb2..08c6389 100644
--- a/subcmds/branches.py
+++ b/subcmds/branches.py
@@ -98,6 +98,22 @@
 """
     PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
 
+    @classmethod
+    def _ExpandProjectToBranches(cls, project_idx):
+        """Expands a project into a list of branch names & associated info.
+
+        Args:
+            project_idx: project.Project index
+
+        Returns:
+            List[Tuple[str, git_config.Branch, int]]
+        """
+        branches = []
+        project = cls.get_parallel_context()["projects"][project_idx]
+        for name, b in project.GetBranches().items():
+            branches.append((name, b, project_idx))
+        return branches
+
     def Execute(self, opt, args):
         projects = self.GetProjects(
             args, all_manifests=not opt.this_manifest_only
@@ -107,17 +123,20 @@
         project_cnt = len(projects)
 
         def _ProcessResults(_pool, _output, results):
-            for name, b in itertools.chain.from_iterable(results):
+            for name, b, project_idx in itertools.chain.from_iterable(results):
+                b.project = projects[project_idx]
                 if name not in all_branches:
                     all_branches[name] = BranchInfo(name)
                 all_branches[name].add(b)
 
-        self.ExecuteInParallel(
-            opt.jobs,
-            expand_project_to_branches,
-            projects,
-            callback=_ProcessResults,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = projects
+            self.ExecuteInParallel(
+                opt.jobs,
+                self._ExpandProjectToBranches,
+                range(len(projects)),
+                callback=_ProcessResults,
+            )
 
         names = sorted(all_branches)
 
@@ -191,19 +210,3 @@
             else:
                 out.write(" in all projects")
             out.nl()
-
-
-def expand_project_to_branches(project):
-    """Expands a project into a list of branch names & associated information.
-
-    Args:
-        project: project.Project
-
-    Returns:
-        List[Tuple[str, git_config.Branch]]
-    """
-    branches = []
-    for name, b in project.GetBranches().items():
-        b.project = project
-        branches.append((name, b))
-    return branches
diff --git a/subcmds/checkout.py b/subcmds/checkout.py
index 379bfa1..859ddf6 100644
--- a/subcmds/checkout.py
+++ b/subcmds/checkout.py
@@ -20,7 +20,6 @@
 from error import GitError
 from error import RepoExitError
 from progress import Progress
-from project import Project
 from repo_logging import RepoLogger
 
 
@@ -30,7 +29,7 @@
 class CheckoutBranchResult(NamedTuple):
     # Whether the Project is on the branch (i.e. branch exists and no errors)
     result: bool
-    project: Project
+    project_idx: int
     error: Exception
 
 
@@ -62,15 +61,17 @@
         if not args:
             self.Usage()
 
-    def _ExecuteOne(self, nb, project):
+    @classmethod
+    def _ExecuteOne(cls, nb, project_idx):
         """Checkout one project."""
         error = None
         result = None
+        project = cls.get_parallel_context()["projects"][project_idx]
         try:
             result = project.CheckoutBranch(nb)
         except GitError as e:
             error = e
-        return CheckoutBranchResult(result, project, error)
+        return CheckoutBranchResult(result, project_idx, error)
 
     def Execute(self, opt, args):
         nb = args[0]
@@ -83,22 +84,25 @@
 
         def _ProcessResults(_pool, pm, results):
             for result in results:
+                project = all_projects[result.project_idx]
                 if result.error is not None:
                     err.append(result.error)
-                    err_projects.append(result.project)
+                    err_projects.append(project)
                 elif result.result:
-                    success.append(result.project)
+                    success.append(project)
                 pm.update(msg="")
 
-        self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(self._ExecuteOne, nb),
-            all_projects,
-            callback=_ProcessResults,
-            output=Progress(
-                f"Checkout {nb}", len(all_projects), quiet=opt.quiet
-            ),
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = all_projects
+            self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(self._ExecuteOne, nb),
+                range(len(all_projects)),
+                callback=_ProcessResults,
+                output=Progress(
+                    f"Checkout {nb}", len(all_projects), quiet=opt.quiet
+                ),
+            )
 
         if err_projects:
             for p in err_projects:
diff --git a/subcmds/diff.py b/subcmds/diff.py
index d9d72b4..7bb0cbb 100644
--- a/subcmds/diff.py
+++ b/subcmds/diff.py
@@ -40,7 +40,8 @@
             help="paths are relative to the repository root",
         )
 
-    def _ExecuteOne(self, absolute, local, project):
+    @classmethod
+    def _ExecuteOne(cls, absolute, local, project_idx):
         """Obtains the diff for a specific project.
 
         Args:
@@ -48,12 +49,13 @@
             local: a boolean, if True, the path is relative to the local
                 (sub)manifest.  If false, the path is relative to the outermost
                 manifest.
-            project: Project to get status of.
+            project_idx: Project index to get status of.
 
         Returns:
             The status of the project.
         """
         buf = io.StringIO()
+        project = cls.get_parallel_context()["projects"][project_idx]
         ret = project.PrintWorkTreeDiff(absolute, output_redir=buf, local=local)
         return (ret, buf.getvalue())
 
@@ -71,12 +73,15 @@
                     ret = 1
             return ret
 
-        return self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(
-                self._ExecuteOne, opt.absolute, opt.this_manifest_only
-            ),
-            all_projects,
-            callback=_ProcessResults,
-            ordered=True,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = all_projects
+            return self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(
+                    self._ExecuteOne, opt.absolute, opt.this_manifest_only
+                ),
+                range(len(all_projects)),
+                callback=_ProcessResults,
+                ordered=True,
+                chunksize=1,
+            )
diff --git a/subcmds/forall.py b/subcmds/forall.py
index 287f2e0..e5fc9e8 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -15,7 +15,6 @@
 import errno
 import functools
 import io
-import multiprocessing
 import os
 import re
 import signal
@@ -26,7 +25,6 @@
 from command import Command
 from command import DEFAULT_LOCAL_JOBS
 from command import MirrorSafeCommand
-from command import WORKER_BATCH_SIZE
 from error import ManifestInvalidRevisionError
 from repo_logging import RepoLogger
 
@@ -241,7 +239,6 @@
                     cmd.insert(cmd.index(cn) + 1, "--color")
 
         mirror = self.manifest.IsMirror
-        rc = 0
 
         smart_sync_manifest_name = "smart_sync_override.xml"
         smart_sync_manifest_path = os.path.join(
@@ -264,32 +261,41 @@
 
         os.environ["REPO_COUNT"] = str(len(projects))
 
+        def _ProcessResults(_pool, _output, results):
+            rc = 0
+            first = True
+            for r, output in results:
+                if output:
+                    if first:
+                        first = False
+                    elif opt.project_header:
+                        print()
+                    # To simplify the DoWorkWrapper, take care of automatic
+                    # newlines.
+                    end = "\n"
+                    if output[-1] == "\n":
+                        end = ""
+                    print(output, end=end)
+                rc = rc or r
+                if r != 0 and opt.abort_on_errors:
+                    raise Exception("Aborting due to previous error")
+                return rc
+
         try:
             config = self.manifest.manifestProject.config
-            with multiprocessing.Pool(opt.jobs, InitWorker) as pool:
-                results_it = pool.imap(
+            with self.ParallelContext():
+                self.get_parallel_context()["projects"] = projects
+                rc = self.ExecuteInParallel(
+                    opt.jobs,
                     functools.partial(
-                        DoWorkWrapper, mirror, opt, cmd, shell, config
+                        self.DoWorkWrapper, mirror, opt, cmd, shell, config
                     ),
-                    enumerate(projects),
-                    chunksize=WORKER_BATCH_SIZE,
+                    range(len(projects)),
+                    callback=_ProcessResults,
+                    ordered=True,
+                    initializer=self.InitWorker,
+                    chunksize=1,
                 )
-                first = True
-                for r, output in results_it:
-                    if output:
-                        if first:
-                            first = False
-                        elif opt.project_header:
-                            print()
-                        # To simplify the DoWorkWrapper, take care of automatic
-                        # newlines.
-                        end = "\n"
-                        if output[-1] == "\n":
-                            end = ""
-                        print(output, end=end)
-                    rc = rc or r
-                    if r != 0 and opt.abort_on_errors:
-                        raise Exception("Aborting due to previous error")
         except (KeyboardInterrupt, WorkerKeyboardInterrupt):
             # Catch KeyboardInterrupt raised inside and outside of workers
             rc = rc or errno.EINTR
@@ -304,31 +310,31 @@
         if rc != 0:
             sys.exit(rc)
 
+    @classmethod
+    def InitWorker(cls):
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+    @classmethod
+    def DoWorkWrapper(cls, mirror, opt, cmd, shell, config, project_idx):
+        """A wrapper around the DoWork() method.
+
+        Catch the KeyboardInterrupt exceptions here and re-raise them as a
+        different, ``Exception``-based exception to stop it flooding the console
+        with stacktraces and making the parent hang indefinitely.
+
+        """
+        project = cls.get_parallel_context()["projects"][project_idx]
+        try:
+            return DoWork(project, mirror, opt, cmd, shell, project_idx, config)
+        except KeyboardInterrupt:
+            print("%s: Worker interrupted" % project.name)
+            raise WorkerKeyboardInterrupt()
+
 
 class WorkerKeyboardInterrupt(Exception):
     """Keyboard interrupt exception for worker processes."""
 
 
-def InitWorker():
-    signal.signal(signal.SIGINT, signal.SIG_IGN)
-
-
-def DoWorkWrapper(mirror, opt, cmd, shell, config, args):
-    """A wrapper around the DoWork() method.
-
-    Catch the KeyboardInterrupt exceptions here and re-raise them as a
-    different, ``Exception``-based exception to stop it flooding the console
-    with stacktraces and making the parent hang indefinitely.
-
-    """
-    cnt, project = args
-    try:
-        return DoWork(project, mirror, opt, cmd, shell, cnt, config)
-    except KeyboardInterrupt:
-        print("%s: Worker interrupted" % project.name)
-        raise WorkerKeyboardInterrupt()
-
-
 def DoWork(project, mirror, opt, cmd, shell, cnt, config):
     env = os.environ.copy()
 
diff --git a/subcmds/grep.py b/subcmds/grep.py
index b677b6b..918651d 100644
--- a/subcmds/grep.py
+++ b/subcmds/grep.py
@@ -23,7 +23,6 @@
 from error import InvalidArgumentsError
 from error import SilentRepoExitError
 from git_command import GitCommand
-from project import Project
 from repo_logging import RepoLogger
 
 
@@ -40,7 +39,7 @@
 class ExecuteOneResult(NamedTuple):
     """Result from an execute instance."""
 
-    project: Project
+    project_idx: int
     rc: int
     stdout: str
     stderr: str
@@ -262,8 +261,10 @@
             help="Show only file names not containing matching lines",
         )
 
-    def _ExecuteOne(self, cmd_argv, project):
+    @classmethod
+    def _ExecuteOne(cls, cmd_argv, project_idx):
         """Process one project."""
+        project = cls.get_parallel_context()["projects"][project_idx]
         try:
             p = GitCommand(
                 project,
@@ -274,7 +275,7 @@
                 verify_command=True,
             )
         except GitError as e:
-            return ExecuteOneResult(project, -1, None, str(e), e)
+            return ExecuteOneResult(project_idx, -1, None, str(e), e)
 
         try:
             error = None
@@ -282,10 +283,12 @@
         except GitError as e:
             rc = 1
             error = e
-        return ExecuteOneResult(project, rc, p.stdout, p.stderr, error)
+        return ExecuteOneResult(project_idx, rc, p.stdout, p.stderr, error)
 
     @staticmethod
-    def _ProcessResults(full_name, have_rev, opt, _pool, out, results):
+    def _ProcessResults(
+        full_name, have_rev, opt, projects, _pool, out, results
+    ):
         git_failed = False
         bad_rev = False
         have_match = False
@@ -293,9 +296,10 @@
         errors = []
 
         for result in results:
+            project = projects[result.project_idx]
             if result.rc < 0:
                 git_failed = True
-                out.project("--- project %s ---" % _RelPath(result.project))
+                out.project("--- project %s ---" % _RelPath(project))
                 out.nl()
                 out.fail("%s", result.stderr)
                 out.nl()
@@ -311,9 +315,7 @@
                     ):
                         bad_rev = True
                     else:
-                        out.project(
-                            "--- project %s ---" % _RelPath(result.project)
-                        )
+                        out.project("--- project %s ---" % _RelPath(project))
                         out.nl()
                         out.fail("%s", result.stderr.strip())
                         out.nl()
@@ -331,13 +333,13 @@
                     rev, line = line.split(":", 1)
                     out.write("%s", rev)
                     out.write(":")
-                    out.project(_RelPath(result.project))
+                    out.project(_RelPath(project))
                     out.write("/")
                     out.write("%s", line)
                     out.nl()
             elif full_name:
                 for line in r:
-                    out.project(_RelPath(result.project))
+                    out.project(_RelPath(project))
                     out.write("/")
                     out.write("%s", line)
                     out.nl()
@@ -381,16 +383,19 @@
             cmd_argv.extend(opt.revision)
         cmd_argv.append("--")
 
-        git_failed, bad_rev, have_match, errors = self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(self._ExecuteOne, cmd_argv),
-            projects,
-            callback=functools.partial(
-                self._ProcessResults, full_name, have_rev, opt
-            ),
-            output=out,
-            ordered=True,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = projects
+            git_failed, bad_rev, have_match, errors = self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(self._ExecuteOne, cmd_argv),
+                range(len(projects)),
+                callback=functools.partial(
+                    self._ProcessResults, full_name, have_rev, opt, projects
+                ),
+                output=out,
+                ordered=True,
+                chunksize=1,
+            )
 
         if git_failed:
             raise GrepCommandError(
diff --git a/subcmds/prune.py b/subcmds/prune.py
index f99082a..18bfc68 100644
--- a/subcmds/prune.py
+++ b/subcmds/prune.py
@@ -27,8 +27,10 @@
 """
     PARALLEL_JOBS = DEFAULT_LOCAL_JOBS
 
-    def _ExecuteOne(self, project):
+    @classmethod
+    def _ExecuteOne(cls, project_idx):
         """Process one project."""
+        project = cls.get_parallel_context()["projects"][project_idx]
         return project.PruneHeads()
 
     def Execute(self, opt, args):
@@ -41,13 +43,15 @@
         def _ProcessResults(_pool, _output, results):
             return list(itertools.chain.from_iterable(results))
 
-        all_branches = self.ExecuteInParallel(
-            opt.jobs,
-            self._ExecuteOne,
-            projects,
-            callback=_ProcessResults,
-            ordered=True,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = projects
+            all_branches = self.ExecuteInParallel(
+                opt.jobs,
+                self._ExecuteOne,
+                range(len(projects)),
+                callback=_ProcessResults,
+                ordered=True,
+            )
 
         if not all_branches:
             return
diff --git a/subcmds/start.py b/subcmds/start.py
index 56008f4..6dca7e4 100644
--- a/subcmds/start.py
+++ b/subcmds/start.py
@@ -21,7 +21,6 @@
 from git_command import git
 from git_config import IsImmutable
 from progress import Progress
-from project import Project
 from repo_logging import RepoLogger
 
 
@@ -29,7 +28,7 @@
 
 
 class ExecuteOneResult(NamedTuple):
-    project: Project
+    project_idx: int
     error: Exception
 
 
@@ -80,18 +79,20 @@
         if not git.check_ref_format("heads/%s" % nb):
             self.OptionParser.error("'%s' is not a valid name" % nb)
 
-    def _ExecuteOne(self, revision, nb, project):
+    @classmethod
+    def _ExecuteOne(cls, revision, nb, default_revisionExpr, project_idx):
         """Start one project."""
         # If the current revision is immutable, such as a SHA1, a tag or
         # a change, then we can't push back to it. Substitute with
         # dest_branch, if defined; or with manifest default revision instead.
         branch_merge = ""
         error = None
+        project = cls.get_parallel_context()["projects"][project_idx]
         if IsImmutable(project.revisionExpr):
             if project.dest_branch:
                 branch_merge = project.dest_branch
             else:
-                branch_merge = self.manifest.default.revisionExpr
+                branch_merge = default_revisionExpr
 
         try:
             project.StartBranch(
@@ -100,7 +101,7 @@
         except Exception as e:
             logger.error("error: unable to checkout %s: %s", project.name, e)
             error = e
-        return ExecuteOneResult(project, error)
+        return ExecuteOneResult(project_idx, error)
 
     def Execute(self, opt, args):
         nb = args[0]
@@ -120,19 +121,28 @@
         def _ProcessResults(_pool, pm, results):
             for result in results:
                 if result.error:
-                    err_projects.append(result.project)
+                    project = all_projects[result.project_idx]
+                    err_projects.append(project)
                     err.append(result.error)
                 pm.update(msg="")
 
-        self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(self._ExecuteOne, opt.revision, nb),
-            all_projects,
-            callback=_ProcessResults,
-            output=Progress(
-                f"Starting {nb}", len(all_projects), quiet=opt.quiet
-            ),
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = all_projects
+            self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(
+                    self._ExecuteOne,
+                    opt.revision,
+                    nb,
+                    self.manifest.default.revisionExpr,
+                ),
+                range(len(all_projects)),
+                callback=_ProcessResults,
+                output=Progress(
+                    f"Starting {nb}", len(all_projects), quiet=opt.quiet
+                ),
+                chunksize=1,
+            )
 
         if err_projects:
             for p in err_projects:
diff --git a/subcmds/status.py b/subcmds/status.py
index dac61ab..cda7362 100644
--- a/subcmds/status.py
+++ b/subcmds/status.py
@@ -88,7 +88,8 @@
             "projects",
         )
 
-    def _StatusHelper(self, quiet, local, project):
+    @classmethod
+    def _StatusHelper(cls, quiet, local, project_idx):
         """Obtains the status for a specific project.
 
         Obtains the status for a project, redirecting the output to
@@ -99,12 +100,13 @@
             local: a boolean, if True, the path is relative to the local
                 (sub)manifest.  If false, the path is relative to the outermost
                 manifest.
-            project: Project to get status of.
+            project_idx: Project index to get status of.
 
         Returns:
             The status of the project.
         """
         buf = io.StringIO()
+        project = cls.get_parallel_context()["projects"][project_idx]
         ret = project.PrintWorkTreeStatus(
             quiet=quiet, output_redir=buf, local=local
         )
@@ -143,15 +145,18 @@
                     ret += 1
             return ret
 
-        counter = self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(
-                self._StatusHelper, opt.quiet, opt.this_manifest_only
-            ),
-            all_projects,
-            callback=_ProcessResults,
-            ordered=True,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = all_projects
+            counter = self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(
+                    self._StatusHelper, opt.quiet, opt.this_manifest_only
+                ),
+                range(len(all_projects)),
+                callback=_ProcessResults,
+                ordered=True,
+                chunksize=1,
+            )
 
         if not opt.quiet and len(all_projects) == counter:
             print("nothing to commit (working directory clean)")
diff --git a/subcmds/upload.py b/subcmds/upload.py
index 8039a1c..6344ee3 100644
--- a/subcmds/upload.py
+++ b/subcmds/upload.py
@@ -713,16 +713,17 @@
         merge_branch = p.stdout.strip()
         return merge_branch
 
-    @staticmethod
-    def _GatherOne(opt, project):
+    @classmethod
+    def _GatherOne(cls, opt, project_idx):
         """Figure out the upload status for |project|."""
+        project = cls.get_parallel_context()["projects"][project_idx]
         if opt.current_branch:
             cbr = project.CurrentBranch
             up_branch = project.GetUploadableBranch(cbr)
             avail = [up_branch] if up_branch else None
         else:
             avail = project.GetUploadableBranches(opt.branch)
-        return (project, avail)
+        return (project_idx, avail)
 
     def Execute(self, opt, args):
         projects = self.GetProjects(
@@ -732,8 +733,9 @@
         def _ProcessResults(_pool, _out, results):
             pending = []
             for result in results:
-                project, avail = result
+                project_idx, avail = result
                 if avail is None:
+                    project = projects[project_idx]
                     logger.error(
                         'repo: error: %s: Unable to upload branch "%s". '
                         "You might be able to fix the branch by running:\n"
@@ -746,12 +748,14 @@
                     pending.append(result)
             return pending
 
-        pending = self.ExecuteInParallel(
-            opt.jobs,
-            functools.partial(self._GatherOne, opt),
-            projects,
-            callback=_ProcessResults,
-        )
+        with self.ParallelContext():
+            self.get_parallel_context()["projects"] = projects
+            pending = self.ExecuteInParallel(
+                opt.jobs,
+                functools.partial(self._GatherOne, opt),
+                range(len(projects)),
+                callback=_ProcessResults,
+            )
 
         if not pending:
             if opt.branch is None: