Destination: use striped lock to reduce lock contention

Destination used a single lock to serialize pushes to the same
destination. When many repositories are replicated via one
destination this may cause lock contention and unnecessarily
throttle replication throughput.

Hence use a striped lock with up to 16 stripes to reduce lock
contention but still serialize replication per repository.
Don't use more stripes than the number of projects to be
replicated if the projects are configured explicitly.

Avoid copying the pending and in-flight maps of PushOnes to be
executed for reading them. Instead directly access the maps held
by a destination and use thread-safe ConcurrentHashMaps for both.

Change-Id: I0ad2f51249ffba429fc61b422056fa85d88b1de4
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 3bb1d2d..fc9d91d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -22,11 +22,11 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.google.common.net.UrlEscapers;
+import com.google.common.util.concurrent.Striped;
 import com.google.gerrit.common.Nullable;
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.entities.BranchNameKey;
@@ -82,6 +82,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
@@ -94,6 +96,7 @@
 
 public class Destination {
   private static final NamedFluentLogger repLog = ReplicationQueue.repLog;
+  private static final int MAX_STRIPES = 16;
 
   private static final String PROJECT_NOT_AVAILABLE = "source project %s not available";
 
@@ -104,11 +107,10 @@
   }
 
   private final ReplicationStateListener stateLog;
-  private final Object stateLock = new Object();
+  private final Striped<ReadWriteLock> stateLock;
   // writes are covered by the stateLock, but some reads are still
   // allowed without the lock
-  private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>();
-  private final Map<URIish, PushOne> inFlight = new HashMap<>();
+  private final Queue queue;
   private final PushOne.Factory opFactory;
   private final DeleteProjectTask.Factory deleteProjectFactory;
   private final UpdateHeadTask.Factory updateHeadFactory;
@@ -128,13 +130,13 @@
     REPOSITORY_MISSING;
   }
 
-  public static class QueueInfo {
-    public final ImmutableMap<URIish, PushOne> pending;
-    public final ImmutableMap<URIish, PushOne> inFlight;
+  public static class Queue {
+    public final ConcurrentMap<URIish, PushOne> pending;
+    public final ConcurrentMap<URIish, PushOne> inFlight;
 
-    public QueueInfo(Map<URIish, PushOne> pending, Map<URIish, PushOne> inFlight) {
-      this.pending = ImmutableMap.copyOf(pending);
-      this.inFlight = ImmutableMap.copyOf(inFlight);
+    public Queue() {
+      this.pending = new ConcurrentHashMap<>();
+      this.inFlight = new ConcurrentHashMap<>();
     }
   }
 
@@ -155,6 +157,7 @@
       @Assisted DestinationConfiguration cfg) {
     this.eventDispatcher = eventDispatcher;
     gitManager = gitRepositoryManager;
+    this.queue = new Queue();
     this.permissionBackend = permissionBackend;
     this.userProvider = userProvider;
     this.projectCache = projectCache;
@@ -162,6 +165,11 @@
     this.replicationTasksStorage = rts;
     this.credentialsFactory = credentialsFactory;
     config = cfg;
+
+    ImmutableList<String> projects = cfg.getProjects();
+    int numStripes = projects.isEmpty() ? MAX_STRIPES : Math.min(projects.size(), MAX_STRIPES);
+    stateLock = Striped.readWriteLock(numStripes);
+
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
       ImmutableSet.Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
@@ -237,10 +245,8 @@
     }
   }
 
-  public QueueInfo getQueueInfo() {
-    synchronized (stateLock) {
-      return new QueueInfo(pending, inFlight);
-    }
+  public Queue getQueue() {
+    return queue;
   }
 
   public void start(WorkQueue workQueue) {
@@ -251,33 +257,31 @@
   public int shutdown() {
     int cnt = 0;
     if (pool != null) {
-      synchronized (stateLock) {
-        int numPending = pending.size();
-        int numInFlight = inFlight.size();
+      int numPending = queue.pending.size();
+      int numInFlight = queue.inFlight.size();
 
-        if (numPending > 0 || numInFlight > 0) {
-          repLog.atWarning().log(
-              "Cancelling replication events (pending=%d, inFlight=%d) for destination %s",
-              numPending, numInFlight, getRemoteConfigName());
+      if (numPending > 0 || numInFlight > 0) {
+        repLog.atWarning().log(
+            "Cancelling replication events (pending=%d, inFlight=%d) for destination %s",
+            numPending, numInFlight, getRemoteConfigName());
 
-          foreachPushOp(
-              pending,
-              push -> {
-                push.cancel();
-                return null;
-              });
-          pending.clear();
-          foreachPushOp(
-              inFlight,
-              push -> {
-                push.setCanceledWhileRunning();
-                return null;
-              });
-          inFlight.clear();
-        }
-        cnt = pool.shutdownNow().size();
-        pool = null;
+        foreachPushOp(
+            queue.pending,
+            push -> {
+              push.cancel();
+              return null;
+            });
+        queue.pending.clear();
+        foreachPushOp(
+            queue.inFlight,
+            push -> {
+              push.setCanceledWhileRunning();
+              return null;
+            });
+        queue.inFlight.clear();
       }
+      cnt = pool.shutdownNow().size();
+      pool = null;
     }
     return cnt;
   }
@@ -286,7 +290,13 @@
     // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
     // values to loop over them.
     for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
-      pushOneFunction.apply(pushOne);
+      Lock lock = stateLock.get(pushOne.getURI()).writeLock();
+      lock.lock();
+      try {
+        pushOneFunction.apply(pushOne);
+      } finally {
+        lock.unlock();
+      }
     }
   }
 
@@ -429,8 +439,12 @@
 
     if (!config.replicatePermissions()) {
       PushOne e;
-      synchronized (stateLock) {
+      Lock lock = stateLock.get(uri).readLock();
+      lock.lock();
+      try {
         e = getPendingPush(uri);
+      } finally {
+        lock.unlock();
       }
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
@@ -454,7 +468,9 @@
 
     ImmutableSet<String> refsToSchedule = toSchedule.build();
     PushOne task;
-    synchronized (stateLock) {
+    Lock lock = stateLock.get(uri).writeLock();
+    lock.lock();
+    try {
       task = getPendingPush(uri);
       if (task == null) {
         task = opFactory.create(project, uri);
@@ -463,7 +479,7 @@
         @SuppressWarnings("unused")
         ScheduledFuture<?> ignored =
             pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        pending.put(uri, task);
+        queue.pending.put(uri, task);
         repLog.atInfo().log(
             "scheduled %s:%s => %s to run %s",
             project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
@@ -480,13 +496,15 @@
       for (String ref : refsToSchedule) {
         state.increasePushTaskCount(project.get(), ref);
       }
+    } finally {
+      lock.unlock();
     }
     postReplicationScheduledEvent(task, refsToSchedule);
   }
 
   @Nullable
   private PushOne getPendingPush(URIish uri) {
-    PushOne e = pending.get(uri);
+    PushOne e = queue.pending.get(uri);
     if (e != null && !e.wasCanceled()) {
       return e;
     }
@@ -495,10 +513,14 @@
 
   void pushWasCanceled(PushOne pushOp) {
     Set<ImmutableSet<String>> notAttemptedRefs = Collections.emptySet();
-    synchronized (stateLock) {
+    Lock lock = stateLock.get(pushOp.getURI()).writeLock();
+    lock.lock();
+    try {
       URIish uri = pushOp.getURI();
-      pending.remove(uri);
+      queue.pending.remove(uri);
       notAttemptedRefs = pushOp.getRefs();
+    } finally {
+      lock.unlock();
     }
     pushOp.notifyNotAttempted(notAttemptedRefs);
   }
@@ -543,7 +565,9 @@
     boolean isFailed = false;
     RemoteRefUpdate.Status failedStatus = null;
 
-    synchronized (stateLock) {
+    Lock lock = stateLock.get(pushOp.getURI()).writeLock();
+    lock.lock();
+    try {
       URIish uri = pushOp.getURI();
       PushOne pendingPushOp = getPendingPush(uri);
 
@@ -579,7 +603,7 @@
           // it will see it was canceled and then it will do nothing with
           // pending list and it will not execute its run implementation.
           pendingPushOp.canceledByReplication();
-          pending.remove(uri);
+          queue.pending.remove(uri);
 
           pushOp.addRefBatches(pendingPushOp.getRefs());
           pushOp.addStates(pendingPushOp.getStates());
@@ -588,7 +612,7 @@
       }
 
       if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
-        pending.put(uri, pushOp);
+        queue.pending.put(uri, pushOp);
         switch (reason) {
           case COLLISION:
             @SuppressWarnings("unused")
@@ -612,7 +636,7 @@
             } else {
               pushOp.canceledByReplication();
               pushOp.retryDone();
-              pending.remove(uri);
+              queue.pending.remove(uri);
               stateLog.error(
                   "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
                   pushOp.getStatesAsArray());
@@ -620,6 +644,8 @@
             break;
         }
       }
+    } finally {
+      lock.unlock();
     }
     if (isFailed) {
       postReplicationFailedEvent(pushOp, failedStatus);
@@ -630,33 +656,41 @@
   }
 
   RunwayStatus requestRunway(PushOne op) {
-    synchronized (stateLock) {
+    Lock lock = stateLock.get(op.getURI()).writeLock();
+    lock.lock();
+    try {
       if (op.wasCanceled()) {
         return RunwayStatus.canceled();
       }
-      pending.remove(op.getURI());
-      PushOne inFlightOp = inFlight.get(op.getURI());
+      queue.pending.remove(op.getURI());
+      PushOne inFlightOp = queue.inFlight.get(op.getURI());
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
       op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
-      inFlight.put(op.getURI(), op);
+      queue.inFlight.put(op.getURI(), op);
+    } finally {
+      lock.unlock();
     }
     return RunwayStatus.allowed();
   }
 
   void notifyFinished(PushOne op) {
-    synchronized (stateLock) {
+    Lock lock = stateLock.get(op.getURI()).writeLock();
+    lock.lock();
+    try {
       if (!op.isRetrying()) {
         replicationTasksStorage.get().finish(op);
       }
-      inFlight.remove(op.getURI());
+      queue.inFlight.remove(op.getURI());
+    } finally {
+      lock.unlock();
     }
   }
 
   public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {
     Map<ReplicateRefUpdate, String> taskNameByReplicateRefUpdate = new HashMap<>();
-    for (PushOne push : pending.values()) {
+    for (PushOne push : queue.pending.values()) {
       String taskName = push.toString();
       for (ReplicateRefUpdate refUpdate : push.getReplicateRefUpdates()) {
         taskNameByReplicateRefUpdate.put(refUpdate, taskName);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
index dc3b05e..471a408 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -35,6 +35,7 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.Destination.Queue;
 import com.googlesource.gerrit.plugins.replication.api.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.api.ReplicationConfig.FilterType;
 import java.net.URISyntaxException;
@@ -227,8 +228,9 @@
     if (drainQueueAttempts == 0) {
       return;
     }
-    int pending = destination.getQueueInfo().pending.size();
-    int inFlight = destination.getQueueInfo().inFlight.size();
+    Queue queue = destination.getQueue();
+    int pending = queue.pending.size();
+    int inFlight = queue.inFlight.size();
     while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
       try {
         logger.atInfo().log(
@@ -239,8 +241,8 @@
         logger.atWarning().withCause(ie).log(
             "Wait for replication events to drain has been interrupted");
       }
-      pending = destination.getQueueInfo().pending.size();
-      inFlight = destination.getQueueInfo().inFlight.size();
+      pending = queue.pending.size();
+      inFlight = queue.inFlight.size();
       drainQueueAttempts--;
     }
     if (pending > 0 || inFlight > 0) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
index 8f4e0a1..213d0de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ListCommand.java
@@ -90,7 +90,7 @@
         addProperty(obj, "AdminUrl", d.getAdminUrls());
         addProperty(obj, "AuthGroup", d.getAuthGroupNames());
         addProperty(obj, "Project", d.getProjects());
-        Destination.QueueInfo q = d.getQueueInfo();
+        Destination.Queue q = d.getQueue();
         addQueueDetails(obj, "InFlight", q.inFlight.values());
         addQueueDetails(obj, "Pending", q.pending.values());
       }
@@ -115,7 +115,7 @@
           out.append("Project: ").append(project).append("\n");
         }
 
-        Destination.QueueInfo q = d.getQueueInfo();
+        Destination.Queue q = d.getQueue();
         out.append("In Flight: ").append(q.inFlight.size()).append("\n");
         addQueueDetails(out, q.inFlight.values());
         out.append("Pending: ").append(q.pending.size()).append("\n");
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
index e800cff..94646c3 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -22,7 +22,7 @@
 import com.google.gerrit.acceptance.WaitUtil;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.api.projects.BranchInput;
-import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo;
+import com.googlesource.gerrit.plugins.replication.Destination.Queue;
 import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
 import com.googlesource.gerrit.plugins.replication.api.ReplicationConfig.FilterType;
 import java.io.IOException;
@@ -303,11 +303,11 @@
     createTestProject(projectName);
 
     WaitUtil.waitUntil(
-        () -> isTaskRescheduled(destination.getQueueInfo(), urish), TEST_NEW_PROJECT_TIMEOUT);
+        () -> isTaskRescheduled(destination.getQueue(), urish), TEST_NEW_PROJECT_TIMEOUT);
     // replicationRetry is set to 1 minute which is the minimum value. That's why
     // should be safe to get the pushOne object from pending because it should be
     // here for one minute
-    PushOne pushOp = destination.getQueueInfo().pending.get(urish);
+    PushOne pushOp = destination.getQueue().pending.get(urish);
 
     WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT);
     WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT);
@@ -333,7 +333,7 @@
     assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1);
   }
 
-  private boolean isTaskRescheduled(QueueInfo queue, URIish uri) {
+  private boolean isTaskRescheduled(Queue queue, URIish uri) {
     PushOne pushOne = queue.pending.get(uri);
     return pushOne == null ? false : pushOne.isRetrying();
   }