Merge branch 'stable-3.1' * stable-3.1: Do not remove replication tasks if they are retrying Change-Id: I5cbdcd29541f6d88fe93364cc4769b93123138b5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index fe5dbad..782ff4f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@ } @Override + public int getDistributionInterval() { + return currentConfig.getDistributionInterval(); + } + + @Override public synchronized int getMaxRefsToLog() { return currentConfig.getMaxRefsToLog(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java index bb9097e..66251a5 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ConfigParser.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.flogger.FluentLogger; @@ -53,6 +55,11 @@ continue; } + if (!c.getFetchRefSpecs().isEmpty()) { + repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName()); + continue; + } + // If destination for push is not set assume equal to source. for (RefSpec ref : c.getPushRefSpecs()) { if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 79cddca..38cc9e0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.gerrit.server.project.ProjectCache.noSuchProject; import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName; import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING; @@ -30,6 +31,7 @@ import com.google.gerrit.entities.BranchNameKey; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; +import com.google.gerrit.exceptions.StorageException; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; @@ -65,6 +67,7 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -307,16 +310,12 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { repLog.atWarning().withCause(e).log( "Error reading project %s from cache", project); return false; } - if (projectState == null) { - repLog.atFine().log("Project %s does not exist", project); - throw new NoSuchProjectException(project); - } if (!projectState.statePermitsRead()) { repLog.atFine().log("Project %s does not permit read", project); return false; @@ -359,13 +358,10 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { return false; } - if (projectState == null) { - throw new NoSuchProjectException(project); - } return shouldReplicate(projectState, userProvider.get()); }) .call(); @@ -578,7 +574,9 @@ if (inFlightOp != null) { return RunwayStatus.denied(inFlightOp.getId()); } - replicationTasksStorage.get().start(op); + if (!replicationTasksStorage.get().start(op)) { + return RunwayStatus.deniedExternal(); + } inFlight.put(op.getURI(), op); } return RunwayStatus.allowed(); @@ -593,6 +591,17 @@ } } + public Set<String> getPrunableTaskNames() { + Set<String> names = new HashSet<>(); + for (PushOne push : pending.values()) { + if (!replicationTasksStorage.get().isWaiting(push)) { + repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI()); + names.add(push.toString()); + } + } + return names; + } + boolean wouldPushProject(Project.NameKey project) { if (!shouldReplicate(project)) { repLog.atFine().log("Skipping replication of project %s", project.get());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java index 4cc9974..5b24bf5 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@ } @Override + public int getDistributionInterval() { + return replicationConfig.getDistributionInterval(); + } + + @Override public String getVersion() { Hasher hasher = Hashing.murmur3_128().newHasher(); hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 4cff68e..199acce 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -54,6 +54,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.errors.NoRemoteRepositoryException; @@ -319,6 +320,8 @@ if (status.isCanceled()) { repLog.atInfo().log( "PushOp for replication to %s was canceled and thus won't be rescheduled", uri); + } else if (status.isExternalInflight()) { + repLog.atInfo().log("PushOp for replication to %s was denied externally", uri); } else { repLog.atInfo().log( "Rescheduling replication to %s to avoid collision with the in-flight push [%s].", @@ -483,8 +486,8 @@ private List<RemoteRefUpdate> generateUpdates(Transport tn) throws IOException, PermissionBackendException { - ProjectState projectState = projectCache.checkedGet(projectName); - if (projectState == null) { + Optional<ProjectState> projectState = projectCache.get(projectName); + if (!projectState.isPresent()) { return Collections.emptyList(); } @@ -493,7 +496,7 @@ boolean filter; PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName); try { - projectState.checkStatePermitsRead(); + projectState.get().checkStatePermitsRead(); forProject.check(ProjectPermission.READ); filter = false; } catch (AuthException | ResourceConflictException e) { @@ -513,7 +516,11 @@ } local = n; } - local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build()); + local = + forProject + .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build()) + .stream() + .collect(toMap(Ref::getName, r -> r)); } List<RemoteRefUpdate> remoteUpdatesList =
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index d3d64db..99e5ee4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@ boolean isDefaultForceUpdate(); /** + * Returns the interval in seconds for running task distribution. + * + * @return number of seconds, zero if never. + */ + int getDistributionInterval(); + + /** * Returns the maximum number of ref-specs to log into the replication_log whenever a push * operation is completed against a replication end. *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index bea30d4..2631cbe 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -86,6 +86,11 @@ } @Override + public int getDistributionInterval() { + return config.getInt("replication", "distributionInterval", 0); + } + + @Override public int getMaxRefsToLog() { return maxRefsToLog; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 3807e00..a8ffeec 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static java.util.concurrent.TimeUnit.SECONDS; + import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; @@ -35,6 +37,8 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.eclipse.jgit.transport.URIish; /** Manages automatic replication to remote repositories. */ @@ -49,6 +53,7 @@ private final ReplicationStateListener stateLog; + private final ReplicationConfig replConfig; private final WorkQueue workQueue; private final DynamicItem<EventDispatcher> dispatcher; private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency @@ -56,14 +61,17 @@ private volatile boolean running; private volatile boolean replaying; private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; + private Distributor distributor; @Inject ReplicationQueue( + ReplicationConfig rc, WorkQueue wq, Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, ReplicationTasksStorage rts) { + replConfig = rc; workQueue = wq; dispatcher = dis; destinations = rd; @@ -80,12 +88,14 @@ replicationTasksStorage.resetAll(); firePendingEvents(); fireBeforeStartupEvents(); + distributor = new Distributor(workQueue); } } @Override public void stop() { running = false; + distributor.stop(); int discarded = destinations.get().shutdown(); if (discarded > 0) { repLog.atWarning().log("Canceled %d replication events during shutdown", discarded); @@ -109,17 +119,17 @@ @VisibleForTesting public void scheduleFullSync( Project.NameKey project, String urlMatch, ReplicationState state, boolean now) { - fire(project, urlMatch, PushOne.ALL_REFS, state, now); + fire(project, urlMatch, PushOne.ALL_REFS, state, now, false); } @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { - fire(event.getProjectName(), event.getRefName()); + fire(event.getProjectName(), event.getRefName(), false); } - private void fire(String projectName, String refName) { + private void fire(String projectName, String refName, boolean isPersisted) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); - fire(Project.nameKey(projectName), null, refName, state, false); + fire(Project.nameKey(projectName), null, refName, state, false, isPersisted); state.markAllPushTasksScheduled(); } @@ -128,7 +138,8 @@ String urlMatch, String refName, ReplicationState state, - boolean now) { + boolean now, + boolean isPersisted) { if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", @@ -140,8 +151,10 @@ for (Destination cfg : destinations.get().getAll(FilterType.ALL)) { if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { for (URIish uri : cfg.getURIs(project, urlMatch)) { - replicationTasksStorage.create( - new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); + if (!isPersisted) { + replicationTasksStorage.create( + new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); + } cfg.schedule(project, refName, uri, state, now); } } else { @@ -159,7 +172,7 @@ String eventKey = String.format("%s:%s", t.project, t.ref); if (!eventsReplayed.contains(eventKey)) { repLog.atInfo().log("Firing pending task %s", eventKey); - fire(t.project, t.ref); + fire(t.project, t.ref, true); eventsReplayed.add(eventKey); } } @@ -168,6 +181,30 @@ } } + private void pruneCompleted() { + // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. + // We also cannot access them by taskId since PushOnes don't have a taskId, they do have + // and Id, but it not the id assigned to the task in the queues. The tasks in the queue + // do use the same name as returned by toString() though, so that be used to correlate + // PushOnes with queue tasks despite their wrappers. + Set<String> prunableTaskNames = new HashSet<>(); + for (Destination destination : destinations.get().getAll(FilterType.ALL)) { + prunableTaskNames.addAll(destination.getPrunableTaskNames()); + } + + for (WorkQueue.Task<?> task : workQueue.getTasks()) { + WorkQueue.Task.State state = task.getState(); + if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) { + if (task instanceof WorkQueue.ProjectTask) { + if (prunableTaskNames.contains(task.toString())) { + repLog.atFine().log("Pruning externally completed task: %s", task); + task.cancel(false); + } + } + } + } + } + @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey p = Project.nameKey(event.getProjectName()); @@ -188,7 +225,7 @@ String eventKey = String.format("%s:%s", event.projectName(), event.refName()); if (!eventsReplayed.contains(eventKey)) { repLog.atInfo().log("Firing pending task %s", event); - fire(event.projectName(), event.refName()); + fire(event.projectName(), event.refName(), false); eventsReplayed.add(eventKey); } } @@ -205,4 +242,49 @@ public abstract String refName(); } + + protected class Distributor implements WorkQueue.CancelableRunnable { + public ScheduledThreadPoolExecutor executor; + public ScheduledFuture<?> future; + + public Distributor(WorkQueue wq) { + int distributionInterval = replConfig.getDistributionInterval(); + if (distributionInterval > 0) { + executor = wq.createQueue(1, "Replication Distribution", false); + future = + executor.scheduleWithFixedDelay( + this, distributionInterval, distributionInterval, SECONDS); + } + } + + @Override + public void run() { + if (!running) { + return; + } + try { + firePendingEvents(); + pruneCompleted(); + } catch (Exception e) { + repLog.atSevere().withCause(e).log("error distributing tasks"); + } + } + + @Override + public void cancel() { + future.cancel(true); + } + + public void stop() { + if (executor != null) { + cancel(); + executor.getQueue().remove(this); + } + } + + @Override + public String toString() { + return "Replication Distributor"; + } + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index 3e6c4d4..554f8bb 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -25,6 +25,7 @@ import com.google.inject.Singleton; import java.io.IOException; import java.nio.file.DirectoryStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -46,13 +47,19 @@ * task: * * <p><code> - * .../building/<tmp_name> new replication tasks under construction - * .../running/<sha1> running replication tasks - * .../waiting/<sha1> outstanding replication tasks + * .../building/<tmp_name> new replication tasks under construction + * .../running/<uri_sha1>/ lock for URI + * .../running/<uri_sha1>/<task_sha1> running replication tasks + * .../waiting/<task_sha1_NN_shard>/<task_sha1> outstanding replication tasks * </code> * + * <p>The URI lock is acquired by creating the directory and released by removing it. + * * <p>Tasks are moved atomically via a rename between those directories to indicate the current * state of each task. + * + * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent + * failures when moving tasks to and from the shard directories from different hosts concurrently. */ @Singleton public class ReplicationTasksStorage { @@ -60,26 +67,50 @@ private boolean disableDeleteForTesting; - public static class ReplicateRefUpdate { - public final String project; + public static class ReplicateRefUpdate extends UriUpdate { public final String ref; - public final String uri; - public final String remote; - public ReplicateRefUpdate(PushOne push, String ref) { - this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName()); + public ReplicateRefUpdate(UriUpdate update, String ref) { + this(update.project, ref, update.uri, update.remote); } public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) { - this.project = project; + this(project, ref, uri.toASCIIString(), remote); + } + + protected ReplicateRefUpdate(String project, String ref, String uri, String remote) { + super(project, uri, remote); this.ref = ref; - this.uri = uri.toASCIIString(); + } + + @Override + public String toString() { + return "ref-update " + ref + " (" + super.toString() + ")"; + } + } + + public static class UriUpdate { + public final String project; + public final String uri; + public final String remote; + + public UriUpdate(PushOne push) { + this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName()); + } + + public UriUpdate(String project, URIish uri, String remote) { + this(project, uri.toASCIIString(), remote); + } + + public UriUpdate(String project, String uri, String remote) { + this.project = project; + this.uri = uri; this.remote = remote; } @Override public String toString() { - return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote; + return "uri-update " + project + " uri:" + uri + " remote:" + remote; } } @@ -107,28 +138,60 @@ this.disableDeleteForTesting = deleteDisabled; } - public synchronized void start(PushOne push) { - for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).start(); + public synchronized boolean start(PushOne push) { + UriLock lock = new UriLock(push); + if (!lock.acquire()) { + return false; } + + boolean started = false; + for (String ref : push.getRefs()) { + started = new Task(lock, ref).start() || started; + } + + if (!started) { // No tasks left, likely replicated externally + lock.release(); + } + return started; } public synchronized void reset(PushOne push) { + UriLock lock = new UriLock(push); for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).reset(); + new Task(lock, ref).reset(); } + lock.release(); } public synchronized void resetAll() { - for (ReplicateRefUpdate r : listRunning()) { - new Task(r).reset(); + try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) { + for (Path dir : dirs) { + UriLock lock = null; + for (ReplicateRefUpdate u : list(dir)) { + if (lock == null) { + lock = new UriLock(u); + } + new Task(u).reset(); + } + if (lock != null) { + lock.release(); + } + } + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while aborting running tasks"); } } - public synchronized void finish(PushOne push) { - for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).finish(); + public boolean isWaiting(PushOne push) { + return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting); + } + + public void finish(PushOne push) { + UriLock lock = new UriLock(push); + for (ReplicateRefUpdate r : list(lock.runningDir)) { + new Task(lock, r.ref).finish(); } + lock.release(); } public synchronized List<ReplicateRefUpdate> listWaiting() { @@ -180,20 +243,73 @@ } } + private class UriLock { + public final UriUpdate update; + public final String uriKey; + public final Path runningDir; + + public UriLock(PushOne push) { + this(new UriUpdate(push)); + } + + public UriLock(UriUpdate update) { + this.update = update; + uriKey = sha1(update.uri).name(); + runningDir = createDir(runningUpdates).resolve(uriKey); + } + + public boolean acquire() { + try { + logger.atFine().log("MKDIR %s %s", runningDir, updateLog()); + Files.createDirectory(runningDir); + return true; + } catch (FileAlreadyExistsException e) { + return false; // already running, likely externally + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey); + return true; // safer to risk a duplicate than to skip it + } + } + + public void release() { + if (disableDeleteForTesting) { + logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog()); + return; + } + + try { + logger.atFine().log("DELETE %s %s", runningDir, updateLog()); + Files.delete(runningDir); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey); + } + } + + private String updateLog() { + return String.format("(%s => %s)", update.project, update.uri); + } + } + private class Task { public final ReplicateRefUpdate update; - public final String json; public final String taskKey; public final Path running; public final Path waiting; - public Task(ReplicateRefUpdate update) { - this.update = update; - json = GSON.toJson(update) + "\n"; + public Task(ReplicateRefUpdate r) { + this(new UriLock(r), r.ref); + } + + public Task(PushOne push, String ref) { + this(new UriLock(push), ref); + } + + public Task(UriLock lock, String ref) { + update = new ReplicateRefUpdate(lock.update, ref); String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote; taskKey = sha1(key).name(); - running = createDir(runningUpdates).resolve(taskKey); - waiting = createDir(waitingUpdates).resolve(taskKey); + running = lock.runningDir.resolve(taskKey); + waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey); } public String create() { @@ -201,6 +317,7 @@ return taskKey; } + String json = GSON.toJson(update) + "\n"; try { Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null); logger.atFine().log("CREATE %s %s", tmp, updateLog()); @@ -213,14 +330,19 @@ return taskKey; } - public void start() { + public boolean start() { rename(waiting, running); + return Files.exists(running); } public void reset() { rename(running, waiting); } + public boolean isWaiting() { + return Files.exists(waiting); + } + public void finish() { if (disableDeleteForTesting) { logger.atFine().log("DELETE %s %s DISABLED", running, updateLog()); @@ -231,7 +353,7 @@ logger.atFine().log("DELETE %s %s", running, updateLog()); Files.delete(running); } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java index bcb1e2f..f7071d8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@ return new RunwayStatus(false, inFlightPushId); } + public static RunwayStatus deniedExternal() { + return new RunwayStatus(false, -1); + } + private final boolean allowed; private final int inFlightPushId; @@ -43,6 +47,10 @@ return !allowed && inFlightPushId == 0; } + public boolean isExternalInflight() { + return !allowed && inFlightPushId == -1; + } + public int getInFlightPushId() { return inFlightPushId; }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 0aad73b..fc3352d 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@ : Timeout for SSH connections. If 0, there is no timeout and the client waits indefinitely. By default, 2 minutes. +replication.distributionInterval +: Interval in seconds for running the replication distributor. When + run, the replication distributor will add all persisted waiting tasks + to the queue to ensure that externally loaded tasks are visible to + the current process. If zero, turn off the replication distributor. By + default, zero. + + Turning this on is likely only useful when there are other processes + (such as other masters in the same cluster) writing to the same + persistence store. To ensure that updates are seen well before their + replicationDelay expires when the distributor is used, the recommended + value for this is approximately the smallest remote.NAME.replicationDelay + divided by 5. + replication.lockErrorMaxRetries : Number of times to retry a replication operation if a lock error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index c09fcd1..94f0dc4 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -245,9 +246,9 @@ return push; } - private void setupProjectCacheMock() throws IOException { + private void setupProjectCacheMock() { projectCacheMock = mock(ProjectCache.class); - when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock); + when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock)); } private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java index efacae7..79b05cc 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@ assertThatContainsDestination(destinations, remoteName1, remoteUrl1); assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } + + @Test + public void shouldSkipFetchRefSpecs() throws Exception { + FileBasedConfig config = newReplicationConfig(); + String pushRemote = "pushRemote"; + final String aRemoteURL = "ssh://somewhere/${name}.git"; + config.setString("remote", pushRemote, "url", aRemoteURL); + + String fetchRemote = "fetchRemote"; + config.setString("remote", fetchRemote, "url", aRemoteURL); + config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*"); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(newReplicationFileBasedConfig()); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL); + } }