Schedule replication by remote, not by project

We now create a thread pool for each [remote] block found inside of
replication.config.  All URLs within that remote block share the same
pool of worker threads.  By defining different remote blocks for each
URL the administrator is able to isolate slow WAN links from faster
local warm-spares, ensuring that the more local systems are able to
stay current, even if the WAN gets really far behind.

Bug: GERRIT-200
Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/Documentation/config-replication.txt b/Documentation/config-replication.txt
index d89267b..6c31855 100644
--- a/Documentation/config-replication.txt
+++ b/Documentation/config-replication.txt
@@ -44,6 +44,7 @@
     url = mirror3.us.some.org:/pub/git/${name}.git
     push = +refs/heads/*
     push = +refs/tags/*
+    threads = 3
 ====
 
 File `replication.config`[[replication_config]]
@@ -56,6 +57,13 @@
 section provides common configuration settings for one or more
 destination URLs.
 
+Each remote section uses its own thread pool.  If pushing to
+multiple remotes, over differing types of network connections
+(e.g. LAN and also public Internet), its a good idea to put them
+into different remote sections, so that replication to the slower
+connection does not starve out the faster local one.  The example
+file above does this.
+
 Section remote[[section_remote]]
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -65,9 +73,10 @@
 
 remote.<name>.url::
 +
-Address of the remote server to push to.  Multiple URLs may be
-specified within a single remote block, listing different destinations
-which share the same settings.  Gerrit pushes to all URLs in parallel,
+Address of the remote server to push to.  Multiple URLs may
+be specified within a single remote block, listing different
+destinations which share the same settings.  Assuming sufficient
+threads in the thread pool, Gerrit pushes to all URLs in parallel,
 using one thread per URL.
 +
 Within each URL value the magic placeholder `$\{name}` is replaced
@@ -115,6 +124,17 @@
 +
 By default, 15 seconds.
 
+remote.<name>.threads::
++
+Number of worker threads to dedicate to pushing to the repositories
+described by this remote.  Each thread can push one project at a
+time, to one destination URL.  Scheduling within the thread pool
+is done on a per-project basis.  If a remote block describes 4
+URLs, allocating 4 threads in the pool will permit some level of
+parallel pushing.
++
+By default, 1 thread.
+
 
 File `~/.ssh/config`[[ssh_config]]
 ----------------------------------
diff --git a/src/main/java/com/google/gerrit/git/PushOp.java b/src/main/java/com/google/gerrit/git/PushOp.java
index 7695f22..eae2c50 100644
--- a/src/main/java/com/google/gerrit/git/PushOp.java
+++ b/src/main/java/com/google/gerrit/git/PushOp.java
@@ -54,6 +54,7 @@
   static final String MIRROR_ALL = "..all..";
 
   private final Set<String> delta = new HashSet<String>();
+  private final PushQueue.ReplicationConfig pool;
   private final String projectName;
   private final RemoteConfig config;
   private final URIish uri;
@@ -61,7 +62,9 @@
 
   private Repository db;
 
-  PushOp(final String d, final RemoteConfig c, final URIish u) {
+  PushOp(final PushQueue.ReplicationConfig p, final String d,
+      final RemoteConfig c, final URIish u) {
+    pool = p;
     projectName = d;
     config = c;
     uri = u;
@@ -86,7 +89,7 @@
       // we start replication (instead a new instance, with the same URI, is
       // created and scheduled for a future point in time.)
       //
-      PushQueue.notifyStarting(this);
+      pool.notifyStarting(this);
       openRepository();
       runImpl();
     } catch (OrmException e) {
diff --git a/src/main/java/com/google/gerrit/git/PushQueue.java b/src/main/java/com/google/gerrit/git/PushQueue.java
index 1c94388..7c73455 100644
--- a/src/main/java/com/google/gerrit/git/PushQueue.java
+++ b/src/main/java/com/google/gerrit/git/PushQueue.java
@@ -46,8 +46,6 @@
 public class PushQueue {
   static final Logger log = LoggerFactory.getLogger(PushQueue.class);
   private static List<ReplicationConfig> configs;
-  private static final Map<URIish, PushOp> pending =
-      new HashMap<URIish, PushOp>();
 
   static {
     // Install our own factory which always runs in batch mode, as we
@@ -81,7 +79,7 @@
       final String urlMatch) {
     for (final ReplicationConfig cfg : allConfigs()) {
       for (final URIish uri : cfg.getURIs(project, urlMatch)) {
-        scheduleImp(project, PushOp.MIRROR_ALL, cfg, uri);
+        cfg.schedule(project, PushOp.MIRROR_ALL, uri);
       }
     }
   }
@@ -101,27 +99,12 @@
     for (final ReplicationConfig cfg : allConfigs()) {
       if (cfg.wouldPushRef(ref)) {
         for (final URIish uri : cfg.getURIs(project, null)) {
-          scheduleImp(project, ref, cfg, uri);
+          cfg.schedule(project, ref, uri);
         }
       }
     }
   }
 
-  private static synchronized void scheduleImp(final Project.NameKey project,
-      final String ref, final ReplicationConfig config, final URIish uri) {
-    PushOp e = pending.get(uri);
-    if (e == null) {
-      e = new PushOp(project.get(), config.remote, uri);
-      WorkQueue.schedule(e, config.delay, TimeUnit.SECONDS);
-      pending.put(uri, e);
-    }
-    e.addRef(ref);
-  }
-
-  static synchronized void notifyStarting(final PushOp op) {
-    pending.remove(op.getURI());
-  }
-
   private static String replace(final String pat, final String key,
       final String val) {
     final int n = pat.indexOf("${" + key + "}");
@@ -181,28 +164,44 @@
         log.error("Invalid URI in " + cfgFile + ": " + e.getMessage());
         return Collections.emptyList();
       }
-
-      int destCnt = 0;
-      for (final ReplicationConfig c : configs) {
-        destCnt += c.remote.getURIs().size();
-      }
-      WorkQueue.adviseThreadCount(destCnt);
     }
     return configs;
   }
 
-  private static class ReplicationConfig {
-    final RemoteConfig remote;
-    final int delay;
+  static class ReplicationConfig {
+    private final RemoteConfig remote;
+    private final int delay;
+    private final WorkQueue.Executor pool;
+    private final Map<URIish, PushOp> pending = new HashMap<URIish, PushOp>();
 
     ReplicationConfig(final RemoteConfig rc, final RepositoryConfig cfg) {
       remote = rc;
-      delay = posInt(rc, cfg, "replicationdelay", 15);
+      delay = Math.max(0, getInt(rc, cfg, "replicationdelay", 15));
+      pool = WorkQueue.createQueue(Math.max(0, getInt(rc, cfg, "threads", 1)));
     }
 
-    private static int posInt(final RemoteConfig rc,
+    private static int getInt(final RemoteConfig rc,
         final RepositoryConfig cfg, final String name, final int defValue) {
-      return Math.max(0, cfg.getInt("remote", rc.getName(), name, defValue));
+      return cfg.getInt("remote", rc.getName(), name, defValue);
+    }
+
+    void schedule(final Project.NameKey project, final String ref,
+        final URIish uri) {
+      synchronized (pending) {
+        PushOp e = pending.get(uri);
+        if (e == null) {
+          e = new PushOp(this, project.get(), remote, uri);
+          pool.schedule(e, delay, TimeUnit.SECONDS);
+          pending.put(uri, e);
+        }
+        e.addRef(ref);
+      }
+    }
+
+    void notifyStarting(final PushOp op) {
+      synchronized (pending) {
+        pending.remove(op.getURI());
+      }
     }
 
     boolean wouldPushRef(final String ref) {
diff --git a/src/main/java/com/google/gerrit/git/WorkQueue.java b/src/main/java/com/google/gerrit/git/WorkQueue.java
index 43e375a..3cc44e8 100644
--- a/src/main/java/com/google/gerrit/git/WorkQueue.java
+++ b/src/main/java/com/google/gerrit/git/WorkQueue.java
@@ -19,6 +19,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableScheduledFuture;
@@ -28,27 +29,33 @@
 
 /** Delayed execution of tasks using a background thread pool. */
 public class WorkQueue {
-  private static Executor pool;
+  private static Executor defaultQueue;
+  private static final CopyOnWriteArrayList<Executor> queues =
+      new CopyOnWriteArrayList<Executor>();
 
-  private static synchronized Executor getPool(final boolean autoStart) {
-    if (autoStart && pool == null) {
-      pool = new Executor(1);
+  private static synchronized Executor getDefaultQueue() {
+    if (defaultQueue == null) {
+      defaultQueue = createQueue(1);
     }
-    return pool;
+    return defaultQueue;
   }
 
-  static void adviseThreadCount(final int callerWants) {
-    final Executor p = getPool(true);
-    p.setMaximumPoolSize(1 + callerWants);
-    p.setCorePoolSize(1 + callerWants);
+  /** Create a new executor queue with one thread. */
+  public static Executor createQueue(final int poolsize) {
+    final Executor r = new Executor(poolsize);
+    r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+    r.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    queues.add(r);
+    return r;
   }
 
   /** Get all of the tasks currently scheduled in the work queue. */
   public static Task<?>[] getTasks() {
-    final Executor p = getPool(false);
-    final Task<?>[] r;
-    r = p != null ? p.toTaskArray() : new Task[] {};
-    return r;
+    final List<Task<?>> r = new ArrayList<Task<?>>();
+    for (final Executor e : queues) {
+      e.addAllTo(r);
+    }
+    return r.toArray(new Task[r.size()]);
   }
 
   /**
@@ -62,13 +69,13 @@
    */
   public static void schedule(final Runnable task, final long delay,
       final TimeUnit unit) {
-    getPool(true).schedule(task, delay, unit);
+    getDefaultQueue().schedule(task, delay, unit);
   }
 
   /** Shutdown the work queue, aborting any pending tasks that haven't started. */
   public static void terminate() {
-    final ScheduledThreadPoolExecutor p = shutdown();
-    if (p != null) {
+    for (final Executor p : queues) {
+      p.shutdown();
       boolean isTerminated;
       do {
         try {
@@ -78,20 +85,12 @@
         }
       } while (!isTerminated);
     }
+    queues.clear();
   }
 
-  private static synchronized ScheduledThreadPoolExecutor shutdown() {
-    final ScheduledThreadPoolExecutor p = pool;
-    if (p != null) {
-      p.shutdown();
-      pool = null;
-      return p;
-    }
-    return null;
-  }
-
+  /** An isolated queue. */
+  public static class Executor extends ScheduledThreadPoolExecutor {
     private final Set<Task<?>> active = new HashSet<Task<?>>();
-  private static class Executor extends ScheduledThreadPoolExecutor {
 
     Executor(final int corePoolSize) {
       super(corePoolSize);
@@ -125,15 +124,13 @@
       }
     }
 
-    Task<?>[] toTaskArray() {
-      final List<Task<?>> list = new ArrayList<Task<?>>();
+    void addAllTo(final List<Task<?>> list) {
       synchronized (active) {
         list.addAll(active);
       }
       for (final Runnable task : getQueue()) { // iterator is thread safe
         list.add((Task<?>) task);
       }
-      return list.toArray(new Task<?>[list.size()]);
     }
   }