Destination: extract StateLock class to simplify lock handling Change-Id: If46d2b1ece73e32a7463c02988ccd1d4e2f769aa
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index fc9d91d..3d7ae36 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -71,7 +71,6 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -85,6 +84,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Ref; @@ -106,8 +106,42 @@ Destination create(DestinationConfiguration config); } + private static class StateLock { + private final Striped<ReadWriteLock> stateLock; + + StateLock(int numStripes) { + stateLock = Striped.readWriteLock(numStripes); + } + + <V> V withWriteLock(URIish uri, Supplier<V> task) { + Lock lock = stateLock.get(uri).writeLock(); + lock.lock(); + try { + return task.get(); + } finally { + lock.unlock(); + } + } + + <V> V withReadLock(URIish uri, Supplier<V> task) { + Lock lock = stateLock.get(uri).readLock(); + lock.lock(); + try { + return task.get(); + } finally { + lock.unlock(); + } + } + } + + private static class RescheduleStatus { + boolean isRescheduled; + boolean isFailed; + RemoteRefUpdate.Status failedStatus; + } + private final ReplicationStateListener stateLog; - private final Striped<ReadWriteLock> stateLock; + private final StateLock stateLock; // writes are covered by the stateLock, but some reads are still // allowed without the lock private final Queue queue; @@ -168,7 +202,7 @@ ImmutableList<String> projects = cfg.getProjects(); int numStripes = projects.isEmpty() ? MAX_STRIPES : Math.min(projects.size(), MAX_STRIPES); - stateLock = Striped.readWriteLock(numStripes); + stateLock = new StateLock(numStripes); CurrentUser remoteUser; if (!cfg.getAuthGroupNames().isEmpty()) { @@ -290,13 +324,7 @@ // Callers may modify the provided opsMap concurrently, hence make a defensive copy of the // values to loop over them. for (PushOne pushOne : ImmutableList.copyOf(opsMap.values())) { - Lock lock = stateLock.get(pushOne.getURI()).writeLock(); - lock.lock(); - try { - pushOneFunction.apply(pushOne); - } finally { - lock.unlock(); - } + stateLock.withWriteLock(pushOne.getURI(), () -> pushOneFunction.apply(pushOne)); } } @@ -438,14 +466,7 @@ repLog.atInfo().log("scheduling replication %s:%s => %s", project, refs, uri); if (!config.replicatePermissions()) { - PushOne e; - Lock lock = stateLock.get(uri).readLock(); - lock.lock(); - try { - e = getPendingPush(uri); - } finally { - lock.unlock(); - } + PushOne e = stateLock.withReadLock(uri, () -> getPendingPush(uri)); if (e == null) { try (Repository git = gitManager.openRepository(project)) { try { @@ -467,38 +488,37 @@ } ImmutableSet<String> refsToSchedule = toSchedule.build(); - PushOne task; - Lock lock = stateLock.get(uri).writeLock(); - lock.lock(); - try { - task = getPendingPush(uri); - if (task == null) { - task = opFactory.create(project, uri); - task.addRefBatch(refsToSchedule); - task.addState(refsToSchedule, state); - @SuppressWarnings("unused") - ScheduledFuture<?> ignored = - pool.schedule(task, now ? 0 : config.getDelay(), TimeUnit.SECONDS); - queue.pending.put(uri, task); - repLog.atInfo().log( - "scheduled %s:%s => %s to run %s", - project, refsToSchedule, task, now ? "now" : "after " + config.getDelay() + "s"); - } else { - boolean added = task.addRefBatch(refsToSchedule); - task.addState(refsToSchedule, state); - String message = "consolidated %s:%s => %s with an existing pending push"; - if (added || !fromStorage) { - repLog.atInfo().log(message, project, refsToSchedule, task); - } else { - repLog.atFine().log(message, project, refsToSchedule, task); - } - } - for (String ref : refsToSchedule) { - state.increasePushTaskCount(project.get(), ref); - } - } finally { - lock.unlock(); - } + PushOne task = + stateLock.withWriteLock( + uri, + () -> { + PushOne t = getPendingPush(uri); + if (t == null) { + t = opFactory.create(project, uri); + t.addRefBatch(refsToSchedule); + t.addState(refsToSchedule, state); + @SuppressWarnings("unused") + ScheduledFuture<?> ignored = + pool.schedule(t, now ? 0 : config.getDelay(), TimeUnit.SECONDS); + queue.pending.put(uri, t); + repLog.atInfo().log( + "scheduled %s:%s => %s to run %s", + project, refsToSchedule, t, now ? "now" : "after " + config.getDelay() + "s"); + } else { + boolean added = t.addRefBatch(refsToSchedule); + t.addState(refsToSchedule, state); + String message = "consolidated %s:%s => %s with an existing pending push"; + if (added || !fromStorage) { + repLog.atInfo().log(message, project, refsToSchedule, t); + } else { + repLog.atFine().log(message, project, refsToSchedule, t); + } + } + for (String ref : refsToSchedule) { + state.increasePushTaskCount(project.get(), ref); + } + return t; + }); postReplicationScheduledEvent(task, refsToSchedule); } @@ -512,16 +532,14 @@ } void pushWasCanceled(PushOne pushOp) { - Set<ImmutableSet<String>> notAttemptedRefs = Collections.emptySet(); - Lock lock = stateLock.get(pushOp.getURI()).writeLock(); - lock.lock(); - try { - URIish uri = pushOp.getURI(); - queue.pending.remove(uri); - notAttemptedRefs = pushOp.getRefs(); - } finally { - lock.unlock(); - } + Set<ImmutableSet<String>> notAttemptedRefs = + stateLock.withWriteLock( + pushOp.getURI(), + () -> { + URIish uri = pushOp.getURI(); + queue.pending.remove(uri); + return pushOp.getRefs(); + }); pushOp.notifyNotAttempted(notAttemptedRefs); } @@ -561,131 +579,126 @@ * @param pushOp The PushOp instance to be scheduled. */ void reschedule(PushOne pushOp, RetryReason reason) { - boolean isRescheduled = false; - boolean isFailed = false; - RemoteRefUpdate.Status failedStatus = null; + RescheduleStatus status = new RescheduleStatus(); + stateLock.withWriteLock( + pushOp.getURI(), + () -> { + URIish uri = pushOp.getURI(); + PushOne pendingPushOp = getPendingPush(uri); - Lock lock = stateLock.get(pushOp.getURI()).writeLock(); - lock.lock(); - try { - URIish uri = pushOp.getURI(); - PushOne pendingPushOp = getPendingPush(uri); + if (pendingPushOp != null) { + // There is one PushOp instance already pending to same URI. - if (pendingPushOp != null) { - // There is one PushOp instance already pending to same URI. + if (pendingPushOp.isRetrying()) { + // The one pending is one already retrying, so it should + // maintain it and add to it the refs of the one passed + // as parameter to the method. - if (pendingPushOp.isRetrying()) { - // The one pending is one already retrying, so it should - // maintain it and add to it the refs of the one passed - // as parameter to the method. + // This scenario would happen if a PushOp has started running + // and then before it failed due transport exception, another + // one to same URI started. The first one would fail and would + // be rescheduled, being present in pending list. When the + // second one fails, it will also be rescheduled and then, + // here, find out replication to its URI is already pending + // for retry (blocking). + pendingPushOp.addRefBatches(pushOp.getRefs()); + pendingPushOp.addStates(pushOp.getStates()); + pushOp.removeStates(); - // This scenario would happen if a PushOp has started running - // and then before it failed due transport exception, another - // one to same URI started. The first one would fail and would - // be rescheduled, being present in pending list. When the - // second one fails, it will also be rescheduled and then, - // here, find out replication to its URI is already pending - // for retry (blocking). - pendingPushOp.addRefBatches(pushOp.getRefs()); - pendingPushOp.addStates(pushOp.getStates()); - pushOp.removeStates(); - - } else { - // The one pending is one that is NOT retrying, it was just - // scheduled believing no problem would happen. The one pending - // should be canceled, and this is done by setting its canceled - // flag, removing it from pending list, and adding its refs to - // the pushOp instance that should then, later, in this method, - // be scheduled for retry. - - // Notice that the PushOp found pending will start running and, - // when notifying it is starting (with pending lock protection), - // it will see it was canceled and then it will do nothing with - // pending list and it will not execute its run implementation. - pendingPushOp.canceledByReplication(); - queue.pending.remove(uri); - - pushOp.addRefBatches(pendingPushOp.getRefs()); - pushOp.addStates(pendingPushOp.getStates()); - pendingPushOp.removeStates(); - } - } - - if (pendingPushOp == null || !pendingPushOp.isRetrying()) { - queue.pending.put(uri, pushOp); - switch (reason) { - case COLLISION: - @SuppressWarnings("unused") - ScheduledFuture<?> ignored = - pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS); - break; - case TRANSPORT_ERROR: - case REPOSITORY_MISSING: - default: - failedStatus = - RetryReason.REPOSITORY_MISSING.equals(reason) - ? NON_EXISTING - : REJECTED_OTHER_REASON; - isFailed = true; - if (pushOp.setToRetry()) { - isRescheduled = true; - replicationTasksStorage.get().reset(pushOp); - @SuppressWarnings("unused") - ScheduledFuture<?> ignored2 = - pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES); } else { - pushOp.canceledByReplication(); - pushOp.retryDone(); + // The one pending is one that is NOT retrying, it was just + // scheduled believing no problem would happen. The one pending + // should be canceled, and this is done by setting its canceled + // flag, removing it from pending list, and adding its refs to + // the pushOp instance that should then, later, in this method, + // be scheduled for retry. + + // Notice that the PushOp found pending will start running and, + // when notifying it is starting (with pending lock protection), + // it will see it was canceled and then it will do nothing with + // pending list and it will not execute its run implementation. + pendingPushOp.canceledByReplication(); queue.pending.remove(uri); - stateLog.error( - "Push to " + pushOp.getURI() + " cancelled after maximum number of retries", - pushOp.getStatesAsArray()); + + pushOp.addRefBatches(pendingPushOp.getRefs()); + pushOp.addStates(pendingPushOp.getStates()); + pendingPushOp.removeStates(); } - break; - } - } - } finally { - lock.unlock(); + } + + if (pendingPushOp == null || !pendingPushOp.isRetrying()) { + queue.pending.put(uri, pushOp); + switch (reason) { + case COLLISION: + @SuppressWarnings("unused") + ScheduledFuture<?> ignored = + pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS); + break; + case TRANSPORT_ERROR: + case REPOSITORY_MISSING: + default: + status.failedStatus = + RetryReason.REPOSITORY_MISSING.equals(reason) + ? NON_EXISTING + : REJECTED_OTHER_REASON; + status.isFailed = true; + if (pushOp.setToRetry()) { + status.isRescheduled = true; + replicationTasksStorage.get().reset(pushOp); + @SuppressWarnings("unused") + ScheduledFuture<?> ignored2 = + pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES); + } else { + pushOp.canceledByReplication(); + pushOp.retryDone(); + queue.pending.remove(uri); + stateLog.error( + "Push to " + pushOp.getURI() + " cancelled after maximum number of retries", + pushOp.getStatesAsArray()); + } + break; + } + } + return status; + }); + + if (status.isFailed) { + postReplicationFailedEvent(pushOp, status.failedStatus); } - if (isFailed) { - postReplicationFailedEvent(pushOp, failedStatus); - } - if (isRescheduled) { + if (status.isRescheduled) { postReplicationScheduledEvent(pushOp); } } RunwayStatus requestRunway(PushOne op) { - Lock lock = stateLock.get(op.getURI()).writeLock(); - lock.lock(); - try { - if (op.wasCanceled()) { - return RunwayStatus.canceled(); - } - queue.pending.remove(op.getURI()); - PushOne inFlightOp = queue.inFlight.get(op.getURI()); - if (inFlightOp != null) { - return RunwayStatus.denied(inFlightOp.getId()); - } - op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op))); - queue.inFlight.put(op.getURI(), op); - } finally { - lock.unlock(); - } + stateLock.withWriteLock( + op.getURI(), + () -> { + if (op.wasCanceled()) { + return RunwayStatus.canceled(); + } + queue.pending.remove(op.getURI()); + PushOne inFlightOp = queue.inFlight.get(op.getURI()); + if (inFlightOp != null) { + return RunwayStatus.denied(inFlightOp.getId()); + } + op.notifyNotAttempted(op.setStartedRefs(replicationTasksStorage.get().start(op))); + queue.inFlight.put(op.getURI(), op); + return null; + }); return RunwayStatus.allowed(); } void notifyFinished(PushOne op) { - Lock lock = stateLock.get(op.getURI()).writeLock(); - lock.lock(); - try { - if (!op.isRetrying()) { - replicationTasksStorage.get().finish(op); - } - queue.inFlight.remove(op.getURI()); - } finally { - lock.unlock(); - } + stateLock.withWriteLock( + op.getURI(), + () -> { + if (!op.isRetrying()) { + replicationTasksStorage.get().finish(op); + } + queue.inFlight.remove(op.getURI()); + return null; + }); } public Map<ReplicateRefUpdate, String> getTaskNamesByReplicateRefUpdate() {