Destination: extract StateLock class to simplify lock handling

Change-Id: If46d2b1ece73e32a7463c02988ccd1d4e2f769aa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index fc9d91d..3d7ae36 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -71,7 +71,6 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -85,6 +84,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
@@ -106,8 +106,42 @@
     Destination create(DestinationConfiguration config);
   }
 
+  private static class StateLock {
+    private final Striped<ReadWriteLock> stateLock;
+
+    StateLock(int numStripes) {
+      stateLock = Striped.readWriteLock(numStripes);
+    }
+
+    <V> V withWriteLock(URIish uri, Supplier<V> task) {
+      Lock lock = stateLock.get(uri).writeLock();
+      lock.lock();
+      try {
+        return task.get();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    <V> V withReadLock(URIish uri, Supplier<V> task) {
+      Lock lock = stateLock.get(uri).readLock();
+      lock.lock();
+      try {
+        return task.get();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private static class RescheduleStatus {
+    boolean isRescheduled;
+    boolean isFailed;
+    RemoteRefUpdate.Status failedStatus;
+  }
+
   private final ReplicationStateListener stateLog;
-  private final Striped<ReadWriteLock> stateLock;
+  private final StateLock stateLock;
   // writes are covered by the stateLock, but some reads are still
   // allowed without the lock
   private final Queue queue;
@@ -168,7 +202,7 @@
 
     ImmutableList<String> projects = cfg.getProjects();
     int numStripes = projects.isEmpty() ? MAX_STRIPES : Math.min(projects.size(), MAX_STRIPES);
-    stateLock = Striped.readWriteLock(numStripes);
+    stateLock = new StateLock(numStripes);
 
     CurrentUser remoteUser;
     if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -290,13 +324,7 @@
     // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
     // values to loop over them.
     for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
-      Lock lock = stateLock.get(pushOne.getURI()).writeLock();
-      lock.lock();
-      try {
-        pushOneFunction.apply(pushOne);
-      } finally {
-        lock.unlock();
-      }
+      stateLock.withWriteLock(pushOne.getURI(), () -> pushOneFunction.apply(pushOne));
     }
   }
 
@@ -438,14 +466,7 @@
     repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
 
     if (!config.replicatePermissions()) {
-      PushOne e;
-      Lock lock = stateLock.get(uri).readLock();
-      lock.lock();
-      try {
-        e = getPendingPush(uri);
-      } finally {
-        lock.unlock();
-      }
+      PushOne e = stateLock.withReadLock(uri, () -> getPendingPush(uri));
       if (e == null) {
         try (Repository git = gitManager.openRepository(project)) {
           try {
@@ -467,38 +488,37 @@
     }
 
     ImmutableSet<String> refsToSchedule = toSchedule.build();
-    PushOne task;
-    Lock lock = stateLock.get(uri).writeLock();
-    lock.lock();
-    try {
-      task = getPendingPush(uri);
-      if (task == null) {
-        task = opFactory.create(project, uri);
-        task.addRefBatch(refsToSchedule);
-        task.addState(refsToSchedule, state);
-        @SuppressWarnings("unused")
-        ScheduledFuture<?> ignored =
-            pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
-        queue.pending.put(uri, task);
-        repLog.atInfo().log(
-            "scheduled %s:%s => %s to run %s",
-            project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
-      } else {
-        boolean added = task.addRefBatch(refsToSchedule);
-        task.addState(refsToSchedule, state);
-        String message = "consolidated %s:%s => %s with an existing pending push";
-        if (added || !fromStorage) {
-          repLog.atInfo().log(message, project, refsToSchedule, task);
-        } else {
-          repLog.atFine().log(message, project, refsToSchedule, task);
-        }
-      }
-      for (String ref : refsToSchedule) {
-        state.increasePushTaskCount(project.get(), ref);
-      }
-    } finally {
-      lock.unlock();
-    }
+    PushOne task =
+        stateLock.withWriteLock(
+            uri,
+            () -> {
+              PushOne t = getPendingPush(uri);
+              if (t == null) {
+                t = opFactory.create(project, uri);
+                t.addRefBatch(refsToSchedule);
+                t.addState(refsToSchedule, state);
+                @SuppressWarnings("unused")
+                ScheduledFuture<?> ignored =
+                    pool.schedule(t, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+                queue.pending.put(uri, t);
+                repLog.atInfo().log(
+                    "scheduled %s:%s => %s to run %s",
+                    project, refsToSchedule, t, now ? "now" : "after " + config.getDelay() + "s");
+              } else {
+                boolean added = t.addRefBatch(refsToSchedule);
+                t.addState(refsToSchedule, state);
+                String message = "consolidated %s:%s => %s with an existing pending push";
+                if (added || !fromStorage) {
+                  repLog.atInfo().log(message, project, refsToSchedule, t);
+                } else {
+                  repLog.atFine().log(message, project, refsToSchedule, t);
+                }
+              }
+              for (String ref : refsToSchedule) {
+                state.increasePushTaskCount(project.get(), ref);
+              }
+              return t;
+            });
     postReplicationScheduledEvent(task, refsToSchedule);
   }
 
@@ -512,16 +532,14 @@
   }
 
   void pushWasCanceled(PushOne pushOp) {
-    Set<ImmutableSet<String>> notAttemptedRefs = Collections.emptySet();
-    Lock lock = stateLock.get(pushOp.getURI()).writeLock();
-    lock.lock();
-    try {
-      URIish uri = pushOp.getURI();
-      queue.pending.remove(uri);
-      notAttemptedRefs = pushOp.getRefs();
-    } finally {
-      lock.unlock();
-    }
+    Set<ImmutableSet<String>> notAttemptedRefs =
+        stateLock.withWriteLock(
+            pushOp.getURI(),
+            () -> {
+              URIish uri = pushOp.getURI();
+              queue.pending.remove(uri);
+              return pushOp.getRefs();
+            });
     pushOp.notifyNotAttempted(notAttemptedRefs);
   }
 
@@ -561,131 +579,126 @@
    * @param pushOp The PushOp instance to be scheduled.
    */
   void reschedule(PushOne pushOp, RetryReason reason) {
-    boolean isRescheduled = false;
-    boolean isFailed = false;
-    RemoteRefUpdate.Status failedStatus = null;
+    RescheduleStatus status = new RescheduleStatus();
+    stateLock.withWriteLock(
+        pushOp.getURI(),
+        () -> {
+          URIish uri = pushOp.getURI();
+          PushOne pendingPushOp = getPendingPush(uri);
 
-    Lock lock = stateLock.get(pushOp.getURI()).writeLock();
-    lock.lock();
-    try {
-      URIish uri = pushOp.getURI();
-      PushOne pendingPushOp = getPendingPush(uri);
+          if (pendingPushOp != null) {
+            // There is one PushOp instance already pending to same URI.
 
-      if (pendingPushOp != null) {
-        // There is one PushOp instance already pending to same URI.
+            if (pendingPushOp.isRetrying()) {
+              // The one pending is one already retrying, so it should
+              // maintain it and add to it the refs of the one passed
+              // as parameter to the method.
 
-        if (pendingPushOp.isRetrying()) {
-          // The one pending is one already retrying, so it should
-          // maintain it and add to it the refs of the one passed
-          // as parameter to the method.
+              // This scenario would happen if a PushOp has started running
+              // and then before it failed due transport exception, another
+              // one to same URI started. The first one would fail and would
+              // be rescheduled, being present in pending list. When the
+              // second one fails, it will also be rescheduled and then,
+              // here, find out replication to its URI is already pending
+              // for retry (blocking).
+              pendingPushOp.addRefBatches(pushOp.getRefs());
+              pendingPushOp.addStates(pushOp.getStates());
+              pushOp.removeStates();
 
-          // This scenario would happen if a PushOp has started running
-          // and then before it failed due transport exception, another
-          // one to same URI started. The first one would fail and would
-          // be rescheduled, being present in pending list. When the
-          // second one fails, it will also be rescheduled and then,
-          // here, find out replication to its URI is already pending
-          // for retry (blocking).
-          pendingPushOp.addRefBatches(pushOp.getRefs());
-          pendingPushOp.addStates(pushOp.getStates());
-          pushOp.removeStates();
-
-        } else {
-          // The one pending is one that is NOT retrying, it was just
-          // scheduled believing no problem would happen. The one pending
-          // should be canceled, and this is done by setting its canceled
-          // flag, removing it from pending list, and adding its refs to
-          // the pushOp instance that should then, later, in this method,
-          // be scheduled for retry.
-
-          // Notice that the PushOp found pending will start running and,
-          // when notifying it is starting (with pending lock protection),
-          // it will see it was canceled and then it will do nothing with
-          // pending list and it will not execute its run implementation.
-          pendingPushOp.canceledByReplication();
-          queue.pending.remove(uri);
-
-          pushOp.addRefBatches(pendingPushOp.getRefs());
-          pushOp.addStates(pendingPushOp.getStates());
-          pendingPushOp.removeStates();
-        }
-      }
-
-      if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
-        queue.pending.put(uri, pushOp);
-        switch (reason) {
-          case COLLISION:
-            @SuppressWarnings("unused")
-            ScheduledFuture<?> ignored =
-                pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
-            break;
-          case TRANSPORT_ERROR:
-          case REPOSITORY_MISSING:
-          default:
-            failedStatus =
-                RetryReason.REPOSITORY_MISSING.equals(reason)
-                    ? NON_EXISTING
-                    : REJECTED_OTHER_REASON;
-            isFailed = true;
-            if (pushOp.setToRetry()) {
-              isRescheduled = true;
-              replicationTasksStorage.get().reset(pushOp);
-              @SuppressWarnings("unused")
-              ScheduledFuture<?> ignored2 =
-                  pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
             } else {
-              pushOp.canceledByReplication();
-              pushOp.retryDone();
+              // The one pending is one that is NOT retrying, it was just
+              // scheduled believing no problem would happen. The one pending
+              // should be canceled, and this is done by setting its canceled
+              // flag, removing it from pending list, and adding its refs to
+              // the pushOp instance that should then, later, in this method,
+              // be scheduled for retry.
+
+              // Notice that the PushOp found pending will start running and,
+              // when notifying it is starting (with pending lock protection),
+              // it will see it was canceled and then it will do nothing with
+              // pending list and it will not execute its run implementation.
+              pendingPushOp.canceledByReplication();
               queue.pending.remove(uri);
-              stateLog.error(
-                  "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
-                  pushOp.getStatesAsArray());
+
+              pushOp.addRefBatches(pendingPushOp.getRefs());
+              pushOp.addStates(pendingPushOp.getStates());
+              pendingPushOp.removeStates();
             }
-            break;
-        }
-      }
-    } finally {
-      lock.unlock();
+          }
+
+          if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
+            queue.pending.put(uri, pushOp);
+            switch (reason) {
+              case COLLISION:
+                @SuppressWarnings("unused")
+                ScheduledFuture<?> ignored =
+                    pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+                break;
+              case TRANSPORT_ERROR:
+              case REPOSITORY_MISSING:
+              default:
+                status.failedStatus =
+                    RetryReason.REPOSITORY_MISSING.equals(reason)
+                        ? NON_EXISTING
+                        : REJECTED_OTHER_REASON;
+                status.isFailed = true;
+                if (pushOp.setToRetry()) {
+                  status.isRescheduled = true;
+                  replicationTasksStorage.get().reset(pushOp);
+                  @SuppressWarnings("unused")
+                  ScheduledFuture<?> ignored2 =
+                      pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
+                } else {
+                  pushOp.canceledByReplication();
+                  pushOp.retryDone();
+                  queue.pending.remove(uri);
+                  stateLog.error(
+                      "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
+                      pushOp.getStatesAsArray());
+                }
+                break;
+            }
+          }
+          return status;
+        });
+
+    if (status.isFailed) {
+      postReplicationFailedEvent(pushOp, status.failedStatus);
     }
-    if (isFailed) {
-      postReplicationFailedEvent(pushOp, failedStatus);
-    }
-    if (isRescheduled) {
+    if (status.isRescheduled) {
       postReplicationScheduledEvent(pushOp);
     }
   }
 
   RunwayStatus requestRunway(PushOne op) {
-    Lock lock = stateLock.get(op.getURI()).writeLock();
-    lock.lock();
-    try {
-      if (op.wasCanceled()) {
-        return RunwayStatus.canceled();
-      }
-      queue.pending.remove(op.getURI());
-      PushOne inFlightOp = queue.inFlight.get(op.getURI());
-      if (inFlightOp != null) {
-        return RunwayStatus.denied(inFlightOp.getId());
-      }
-      op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
-      queue.inFlight.put(op.getURI(), op);
-    } finally {
-      lock.unlock();
-    }
+    stateLock.withWriteLock(
+        op.getURI(),
+        () -> {
+          if (op.wasCanceled()) {
+            return RunwayStatus.canceled();
+          }
+          queue.pending.remove(op.getURI());
+          PushOne inFlightOp = queue.inFlight.get(op.getURI());
+          if (inFlightOp != null) {
+            return RunwayStatus.denied(inFlightOp.getId());
+          }
+          op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
+          queue.inFlight.put(op.getURI(), op);
+          return null;
+        });
     return RunwayStatus.allowed();
   }
 
   void notifyFinished(PushOne op) {
-    Lock lock = stateLock.get(op.getURI()).writeLock();
-    lock.lock();
-    try {
-      if (!op.isRetrying()) {
-        replicationTasksStorage.get().finish(op);
-      }
-      queue.inFlight.remove(op.getURI());
-    } finally {
-      lock.unlock();
-    }
+    stateLock.withWriteLock(
+        op.getURI(),
+        () -> {
+          if (!op.isRetrying()) {
+            replicationTasksStorage.get().finish(op);
+          }
+          queue.inFlight.remove(op.getURI());
+          return null;
+        });
   }
 
   public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {