Destination: extract StateLock class to simplify lock handling
Change-Id: If46d2b1ece73e32a7463c02988ccd1d4e2f769aa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index fc9d91d..3d7ae36 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -71,7 +71,6 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -85,6 +84,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Ref;
@@ -106,8 +106,42 @@
Destination create(DestinationConfiguration config);
}
+ private static class StateLock {
+ private final Striped<ReadWriteLock> stateLock;
+
+ StateLock(int numStripes) {
+ stateLock = Striped.readWriteLock(numStripes);
+ }
+
+ <V> V withWriteLock(URIish uri, Supplier<V> task) {
+ Lock lock = stateLock.get(uri).writeLock();
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ <V> V withReadLock(URIish uri, Supplier<V> task) {
+ Lock lock = stateLock.get(uri).readLock();
+ lock.lock();
+ try {
+ return task.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private static class RescheduleStatus {
+ boolean isRescheduled;
+ boolean isFailed;
+ RemoteRefUpdate.Status failedStatus;
+ }
+
private final ReplicationStateListener stateLog;
- private final Striped<ReadWriteLock> stateLock;
+ private final StateLock stateLock;
// writes are covered by the stateLock, but some reads are still
// allowed without the lock
private final Queue queue;
@@ -168,7 +202,7 @@
ImmutableList<String> projects = cfg.getProjects();
int numStripes = projects.isEmpty() ? MAX_STRIPES : Math.min(projects.size(), MAX_STRIPES);
- stateLock = Striped.readWriteLock(numStripes);
+ stateLock = new StateLock(numStripes);
CurrentUser remoteUser;
if (!cfg.getAuthGroupNames().isEmpty()) {
@@ -290,13 +324,7 @@
// Callers may modify the provided opsMap concurrently, hence make a defensive copy of the
// values to loop over them.
for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) {
- Lock lock = stateLock.get(pushOne.getURI()).writeLock();
- lock.lock();
- try {
- pushOneFunction.apply(pushOne);
- } finally {
- lock.unlock();
- }
+ stateLock.withWriteLock(pushOne.getURI(), () -> pushOneFunction.apply(pushOne));
}
}
@@ -438,14 +466,7 @@
repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri);
if (!config.replicatePermissions()) {
- PushOne e;
- Lock lock = stateLock.get(uri).readLock();
- lock.lock();
- try {
- e = getPendingPush(uri);
- } finally {
- lock.unlock();
- }
+ PushOne e = stateLock.withReadLock(uri, () -> getPendingPush(uri));
if (e == null) {
try (Repository git = gitManager.openRepository(project)) {
try {
@@ -467,38 +488,37 @@
}
ImmutableSet<String> refsToSchedule = toSchedule.build();
- PushOne task;
- Lock lock = stateLock.get(uri).writeLock();
- lock.lock();
- try {
- task = getPendingPush(uri);
- if (task == null) {
- task = opFactory.create(project, uri);
- task.addRefBatch(refsToSchedule);
- task.addState(refsToSchedule, state);
- @SuppressWarnings("unused")
- ScheduledFuture<?> ignored =
- pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
- queue.pending.put(uri, task);
- repLog.atInfo().log(
- "scheduled %s:%s => %s to run %s",
- project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s");
- } else {
- boolean added = task.addRefBatch(refsToSchedule);
- task.addState(refsToSchedule, state);
- String message = "consolidated %s:%s => %s with an existing pending push";
- if (added || !fromStorage) {
- repLog.atInfo().log(message, project, refsToSchedule, task);
- } else {
- repLog.atFine().log(message, project, refsToSchedule, task);
- }
- }
- for (String ref : refsToSchedule) {
- state.increasePushTaskCount(project.get(), ref);
- }
- } finally {
- lock.unlock();
- }
+ PushOne task =
+ stateLock.withWriteLock(
+ uri,
+ () -> {
+ PushOne t = getPendingPush(uri);
+ if (t == null) {
+ t = opFactory.create(project, uri);
+ t.addRefBatch(refsToSchedule);
+ t.addState(refsToSchedule, state);
+ @SuppressWarnings("unused")
+ ScheduledFuture<?> ignored =
+ pool.schedule(t, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+ queue.pending.put(uri, t);
+ repLog.atInfo().log(
+ "scheduled %s:%s => %s to run %s",
+ project, refsToSchedule, t, now ? "now" : "after " + config.getDelay() + "s");
+ } else {
+ boolean added = t.addRefBatch(refsToSchedule);
+ t.addState(refsToSchedule, state);
+ String message = "consolidated %s:%s => %s with an existing pending push";
+ if (added || !fromStorage) {
+ repLog.atInfo().log(message, project, refsToSchedule, t);
+ } else {
+ repLog.atFine().log(message, project, refsToSchedule, t);
+ }
+ }
+ for (String ref : refsToSchedule) {
+ state.increasePushTaskCount(project.get(), ref);
+ }
+ return t;
+ });
postReplicationScheduledEvent(task, refsToSchedule);
}
@@ -512,16 +532,14 @@
}
void pushWasCanceled(PushOne pushOp) {
- Set<ImmutableSet<String>> notAttemptedRefs = Collections.emptySet();
- Lock lock = stateLock.get(pushOp.getURI()).writeLock();
- lock.lock();
- try {
- URIish uri = pushOp.getURI();
- queue.pending.remove(uri);
- notAttemptedRefs = pushOp.getRefs();
- } finally {
- lock.unlock();
- }
+ Set<ImmutableSet<String>> notAttemptedRefs =
+ stateLock.withWriteLock(
+ pushOp.getURI(),
+ () -> {
+ URIish uri = pushOp.getURI();
+ queue.pending.remove(uri);
+ return pushOp.getRefs();
+ });
pushOp.notifyNotAttempted(notAttemptedRefs);
}
@@ -561,131 +579,126 @@
* @param pushOp The PushOp instance to be scheduled.
*/
void reschedule(PushOne pushOp, RetryReason reason) {
- boolean isRescheduled = false;
- boolean isFailed = false;
- RemoteRefUpdate.Status failedStatus = null;
+ RescheduleStatus status = new RescheduleStatus();
+ stateLock.withWriteLock(
+ pushOp.getURI(),
+ () -> {
+ URIish uri = pushOp.getURI();
+ PushOne pendingPushOp = getPendingPush(uri);
- Lock lock = stateLock.get(pushOp.getURI()).writeLock();
- lock.lock();
- try {
- URIish uri = pushOp.getURI();
- PushOne pendingPushOp = getPendingPush(uri);
+ if (pendingPushOp != null) {
+ // There is one PushOp instance already pending to same URI.
- if (pendingPushOp != null) {
- // There is one PushOp instance already pending to same URI.
+ if (pendingPushOp.isRetrying()) {
+ // The one pending is one already retrying, so it should
+ // maintain it and add to it the refs of the one passed
+ // as parameter to the method.
- if (pendingPushOp.isRetrying()) {
- // The one pending is one already retrying, so it should
- // maintain it and add to it the refs of the one passed
- // as parameter to the method.
+ // This scenario would happen if a PushOp has started running
+ // and then before it failed due transport exception, another
+ // one to same URI started. The first one would fail and would
+ // be rescheduled, being present in pending list. When the
+ // second one fails, it will also be rescheduled and then,
+ // here, find out replication to its URI is already pending
+ // for retry (blocking).
+ pendingPushOp.addRefBatches(pushOp.getRefs());
+ pendingPushOp.addStates(pushOp.getStates());
+ pushOp.removeStates();
- // This scenario would happen if a PushOp has started running
- // and then before it failed due transport exception, another
- // one to same URI started. The first one would fail and would
- // be rescheduled, being present in pending list. When the
- // second one fails, it will also be rescheduled and then,
- // here, find out replication to its URI is already pending
- // for retry (blocking).
- pendingPushOp.addRefBatches(pushOp.getRefs());
- pendingPushOp.addStates(pushOp.getStates());
- pushOp.removeStates();
-
- } else {
- // The one pending is one that is NOT retrying, it was just
- // scheduled believing no problem would happen. The one pending
- // should be canceled, and this is done by setting its canceled
- // flag, removing it from pending list, and adding its refs to
- // the pushOp instance that should then, later, in this method,
- // be scheduled for retry.
-
- // Notice that the PushOp found pending will start running and,
- // when notifying it is starting (with pending lock protection),
- // it will see it was canceled and then it will do nothing with
- // pending list and it will not execute its run implementation.
- pendingPushOp.canceledByReplication();
- queue.pending.remove(uri);
-
- pushOp.addRefBatches(pendingPushOp.getRefs());
- pushOp.addStates(pendingPushOp.getStates());
- pendingPushOp.removeStates();
- }
- }
-
- if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
- queue.pending.put(uri, pushOp);
- switch (reason) {
- case COLLISION:
- @SuppressWarnings("unused")
- ScheduledFuture<?> ignored =
- pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
- break;
- case TRANSPORT_ERROR:
- case REPOSITORY_MISSING:
- default:
- failedStatus =
- RetryReason.REPOSITORY_MISSING.equals(reason)
- ? NON_EXISTING
- : REJECTED_OTHER_REASON;
- isFailed = true;
- if (pushOp.setToRetry()) {
- isRescheduled = true;
- replicationTasksStorage.get().reset(pushOp);
- @SuppressWarnings("unused")
- ScheduledFuture<?> ignored2 =
- pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
} else {
- pushOp.canceledByReplication();
- pushOp.retryDone();
+ // The one pending is one that is NOT retrying, it was just
+ // scheduled believing no problem would happen. The one pending
+ // should be canceled, and this is done by setting its canceled
+ // flag, removing it from pending list, and adding its refs to
+ // the pushOp instance that should then, later, in this method,
+ // be scheduled for retry.
+
+ // Notice that the PushOp found pending will start running and,
+ // when notifying it is starting (with pending lock protection),
+ // it will see it was canceled and then it will do nothing with
+ // pending list and it will not execute its run implementation.
+ pendingPushOp.canceledByReplication();
queue.pending.remove(uri);
- stateLog.error(
- "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
- pushOp.getStatesAsArray());
+
+ pushOp.addRefBatches(pendingPushOp.getRefs());
+ pushOp.addStates(pendingPushOp.getStates());
+ pendingPushOp.removeStates();
}
- break;
- }
- }
- } finally {
- lock.unlock();
+ }
+
+ if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
+ queue.pending.put(uri, pushOp);
+ switch (reason) {
+ case COLLISION:
+ @SuppressWarnings("unused")
+ ScheduledFuture<?> ignored =
+ pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+ break;
+ case TRANSPORT_ERROR:
+ case REPOSITORY_MISSING:
+ default:
+ status.failedStatus =
+ RetryReason.REPOSITORY_MISSING.equals(reason)
+ ? NON_EXISTING
+ : REJECTED_OTHER_REASON;
+ status.isFailed = true;
+ if (pushOp.setToRetry()) {
+ status.isRescheduled = true;
+ replicationTasksStorage.get().reset(pushOp);
+ @SuppressWarnings("unused")
+ ScheduledFuture<?> ignored2 =
+ pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
+ } else {
+ pushOp.canceledByReplication();
+ pushOp.retryDone();
+ queue.pending.remove(uri);
+ stateLog.error(
+ "Push to " + pushOp.getURI() + " cancelled after maximum number of retries",
+ pushOp.getStatesAsArray());
+ }
+ break;
+ }
+ }
+ return status;
+ });
+
+ if (status.isFailed) {
+ postReplicationFailedEvent(pushOp, status.failedStatus);
}
- if (isFailed) {
- postReplicationFailedEvent(pushOp, failedStatus);
- }
- if (isRescheduled) {
+ if (status.isRescheduled) {
postReplicationScheduledEvent(pushOp);
}
}
RunwayStatus requestRunway(PushOne op) {
- Lock lock = stateLock.get(op.getURI()).writeLock();
- lock.lock();
- try {
- if (op.wasCanceled()) {
- return RunwayStatus.canceled();
- }
- queue.pending.remove(op.getURI());
- PushOne inFlightOp = queue.inFlight.get(op.getURI());
- if (inFlightOp != null) {
- return RunwayStatus.denied(inFlightOp.getId());
- }
- op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
- queue.inFlight.put(op.getURI(), op);
- } finally {
- lock.unlock();
- }
+ stateLock.withWriteLock(
+ op.getURI(),
+ () -> {
+ if (op.wasCanceled()) {
+ return RunwayStatus.canceled();
+ }
+ queue.pending.remove(op.getURI());
+ PushOne inFlightOp = queue.inFlight.get(op.getURI());
+ if (inFlightOp != null) {
+ return RunwayStatus.denied(inFlightOp.getId());
+ }
+ op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op)));
+ queue.inFlight.put(op.getURI(), op);
+ return null;
+ });
return RunwayStatus.allowed();
}
void notifyFinished(PushOne op) {
- Lock lock = stateLock.get(op.getURI()).writeLock();
- lock.lock();
- try {
- if (!op.isRetrying()) {
- replicationTasksStorage.get().finish(op);
- }
- queue.inFlight.remove(op.getURI());
- } finally {
- lock.unlock();
- }
+ stateLock.withWriteLock(
+ op.getURI(),
+ () -> {
+ if (!op.isRetrying()) {
+ replicationTasksStorage.get().finish(op);
+ }
+ queue.inFlight.remove(op.getURI());
+ return null;
+ });
}
public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {