forall: improve pool logic

Use a pool contextmanager to take care of the messy details like
properly cleaning it up when aborting.

Change-Id: I264ebb591c2e67c9a975b6dcc0f14b29cc66a874
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297243
Reviewed-by: Chris Mcdonald <cjmcdonald@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
diff --git a/subcmds/forall.py b/subcmds/forall.py
index d871b3e..b874b6d 100644
--- a/subcmds/forall.py
+++ b/subcmds/forall.py
@@ -21,7 +21,7 @@
 import subprocess
 
 from color import Coloring
-from command import DEFAULT_LOCAL_JOBS, Command, MirrorSafeCommand
+from command import DEFAULT_LOCAL_JOBS, Command, MirrorSafeCommand, WORKER_BATCH_SIZE
 import platform_utils
 
 _CAN_COLOR = [
@@ -234,31 +234,26 @@
 
     os.environ['REPO_COUNT'] = str(len(projects))
 
-    pool = multiprocessing.Pool(opt.jobs, InitWorker)
     try:
       config = self.manifest.manifestProject.config
-      results_it = pool.imap(
-          DoWorkWrapper,
-          self.ProjectArgs(projects, mirror, opt, cmd, shell, config))
-      pool.close()
-      for r in results_it:
-        rc = rc or r
-        if r != 0 and opt.abort_on_errors:
-          raise Exception('Aborting due to previous error')
+      with multiprocessing.Pool(opt.jobs, InitWorker) as pool:
+        results_it = pool.imap(
+            DoWorkWrapper,
+            self.ProjectArgs(projects, mirror, opt, cmd, shell, config),
+            chunksize=WORKER_BATCH_SIZE)
+        for r in results_it:
+          rc = rc or r
+          if r != 0 and opt.abort_on_errors:
+            raise Exception('Aborting due to previous error')
     except (KeyboardInterrupt, WorkerKeyboardInterrupt):
       # Catch KeyboardInterrupt raised inside and outside of workers
-      print('Interrupted - terminating the pool')
-      pool.terminate()
       rc = rc or errno.EINTR
     except Exception as e:
       # Catch any other exceptions raised
       print('Got an error, terminating the pool: %s: %s' %
             (type(e).__name__, e),
             file=sys.stderr)
-      pool.terminate()
       rc = rc or getattr(e, 'errno', 1)
-    finally:
-      pool.join()
     if rc != 0:
       sys.exit(rc)