Merge branch 'stable-3.1' into stable-3.2 * stable-3.1: Call retryDone() when giving up after lock failures Fix issue with task cleanup after retry Change-Id: I6dbeaa0d21545a1903bdb11c5de5d9e8f72079c5
diff --git a/BUILD b/BUILD index 1bf917d..ba4a21d 100644 --- a/BUILD +++ b/BUILD
@@ -21,6 +21,7 @@ junit_tests( name = "replication_tests", + timeout = "long", srcs = glob([ "src/test/java/**/*Test.java", ]),
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index fe5dbad..782ff4f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@ } @Override + public int getDistributionInterval() { + return currentConfig.getDistributionInterval(); + } + + @Override public synchronized int getMaxRefsToLog() { return currentConfig.getMaxRefsToLog(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java index fa26e82..2436fee 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -31,7 +31,7 @@ } private final RemoteConfig config; - private final DestinationsCollection destinations; + private final ReplicationDestinations destinations; private final DynamicItem<AdminApiFactory> adminApiFactory; private final Project.NameKey project; private final String head; @@ -39,7 +39,7 @@ @Inject CreateProjectTask( RemoteConfig config, - DestinationsCollection destinations, + ReplicationDestinations destinations, DynamicItem<AdminApiFactory> adminApiFactory, @Assisted Project.NameKey project, @Assisted String head) { @@ -63,7 +63,8 @@ return true; } - repLog.warn("Cannot create new project {} on remote site {}.", projectName, replicateURI); + repLog.atWarning().log( + "Cannot create new project %s on remote site %s.", projectName, replicateURI); return false; } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java index 4617672..8ea7227 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -55,7 +55,7 @@ return; } - repLog.warn("Cannot delete project {} on remote site {}.", project, replicateURI); + repLog.atWarning().log("Cannot delete project %s on remote site %s.", project, replicateURI); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 62ea42b..67dae7e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.gerrit.server.project.ProjectCache.noSuchProject; import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName; import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING; @@ -30,6 +31,7 @@ import com.google.gerrit.entities.BranchNameKey; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; +import com.google.gerrit.exceptions.StorageException; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; @@ -51,6 +53,7 @@ import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.util.RequestContext; +import com.google.gerrit.util.logging.NamedFluentLogger; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Provider; @@ -64,10 +67,13 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -81,10 +87,9 @@ import org.eclipse.jgit.transport.RemoteConfig; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; public class Destination { - private static final Logger repLog = ReplicationQueue.repLog; + private static final NamedFluentLogger repLog = ReplicationQueue.repLog; private static final String PROJECT_NOT_AVAILABLE = "source project %s not available"; @@ -94,7 +99,9 @@ private final ReplicationStateListener stateLog; private final Object stateLock = new Object(); - private final Map<URIish, PushOne> pending = new HashMap<>(); + // writes are covered by the stateLock, but some reads are still + // allowed without the lock + private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>(); private final Map<URIish, PushOne> inFlight = new HashMap<>(); private final PushOne.Factory opFactory; private final DeleteProjectTask.Factory deleteProjectFactory; @@ -156,7 +163,7 @@ builder.add(g.getUUID()); addRecursiveParents(g.getUUID(), builder, groupIncludeCache); } else { - repLog.warn("Group \"{}\" not recognized, removing from authGroup", name); + repLog.atWarning().log("Group \"%s\" not recognized, removing from authGroup", name); } } remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build())); @@ -240,11 +247,9 @@ int numInFlight = inFlight.size(); if (numPending > 0 || numInFlight > 0) { - repLog.warn( - "Cancelling replication events (pending={}, inFlight={}) for destination {}", - numPending, - numInFlight, - getRemoteConfigName()); + repLog.atWarning().log( + "Cancelling replication events (pending=%d, inFlight=%d) for destination %s", + numPending, numInFlight, getRemoteConfigName()); foreachPushOp( pending, @@ -280,7 +285,8 @@ if (!config.replicateHiddenProjects() && state.getProject().getState() == com.google.gerrit.extensions.client.ProjectState.HIDDEN) { - repLog.debug("Project {} is hidden and replication of hidden projects is disabled", name); + repLog.atFine().log( + "Project %s is hidden and replication of hidden projects is disabled", name); return false; } @@ -293,10 +299,9 @@ permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck); return true; } catch (AuthException e) { - repLog.debug( - "Project {} is not visible to current user {}", - name, - user.getUserName().orElse("unknown")); + repLog.atFine().log( + "Project %s is not visible to current user %s", + name, user.getUserName().orElse("unknown")); return false; } } @@ -309,21 +314,22 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { - repLog.warn("Error reading project {} from cache", project, e); + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { + repLog.atWarning().withCause(e).log( + "Error reading project %s from cache", project); return false; } if (projectState == null) { - repLog.debug("Project {} does not exist", project); + repLog.atFine().log("Project %s does not exist", project); throw new NoSuchProjectException(project); } if (!projectState.statePermitsRead()) { - repLog.debug("Project {} does not permit read", project); + repLog.atFine().log("Project %s does not permit read", project); return false; } if (!shouldReplicate(projectState, userProvider.get())) { - repLog.debug("Project {} should not be replicated", project); + repLog.atFine().log("Project %s should not be replicated", project); return false; } if (PushOne.ALL_REFS.equals(ref)) { @@ -339,11 +345,9 @@ .ref(ref) .check(RefPermission.READ); } catch (AuthException e) { - repLog.debug( - "Ref {} on project {} is not visible to calling user {}", - ref, - project, - userProvider.get().getUserName().orElse("unknown")); + repLog.atFine().log( + "Ref %s on project %s is not visible to calling user %s", + ref, project, userProvider.get().getUserName().orElse("unknown")); return false; } return true; @@ -365,13 +369,10 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { return false; } - if (projectState == null) { - throw new NoSuchProjectException(project); - } return shouldReplicate(projectState, userProvider.get()); }) .call(); @@ -391,10 +392,10 @@ void schedule( Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) { if (!shouldReplicate(project, ref, state)) { - repLog.debug("Not scheduling replication {}:{} => {}", project, ref, uri); + repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri); return; } - repLog.info("scheduling replication {}:{} => {}", project, ref, uri); + repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri); if (!config.replicatePermissions()) { PushOne e; @@ -436,7 +437,8 @@ task.addState(ref, state); } state.increasePushTaskCount(project.get(), ref); - repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay()); + repLog.atInfo().log( + "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay()); } } @@ -562,6 +564,7 @@ pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES); } else { pushOp.canceledByReplication(); + pushOp.retryDone(); pending.remove(uri); stateLog.error( "Push to " + pushOp.getURI() + " cancelled after maximum number of retries", @@ -598,13 +601,24 @@ } } + public Set<String> getPrunableTaskNames() { + Set<String> names = new HashSet<>(); + for (PushOne push : pending.values()) { + if (!replicationTasksStorage.get().isWaiting(push)) { + repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI()); + names.add(push.toString()); + } + } + return names; + } + boolean wouldPush(URIish uri, Project.NameKey project, String ref) { return matches(uri, project) && wouldPushProject(project) && wouldPushRef(ref); } boolean wouldPushProject(Project.NameKey project) { if (!shouldReplicate(project)) { - repLog.debug("Skipping replication of project {}", project.get()); + repLog.atFine().log("Skipping replication of project %s", project.get()); return false; } @@ -616,7 +630,8 @@ boolean matches = (new ReplicationFilter(projects)).matches(project); if (!matches) { - repLog.debug("Skipping replication of project {}; does not match filter", project.get()); + repLog.atFine().log( + "Skipping replication of project %s; does not match filter", project.get()); } return matches; } @@ -627,7 +642,7 @@ boolean wouldPushRef(String ref) { if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) { - repLog.debug("Skipping push of ref {}; it is a meta ref", ref); + repLog.atFine().log("Skipping push of ref %s; it is a meta ref", ref); return false; } if (PushOne.ALL_REFS.equals(ref)) { @@ -638,7 +653,7 @@ return true; } } - repLog.debug("Skipping push of ref {}; it does not match push ref specs", ref); + repLog.atFine().log("Skipping push of ref %s; it does not match push ref specs", ref); return false; } @@ -688,7 +703,7 @@ } else if (remoteNameStyle.equals("basenameOnly")) { name = FilenameUtils.getBaseName(name); } else if (!remoteNameStyle.equals("slash")) { - repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle); + repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle); } String replacedPath = replaceName(template.getPath(), name, isSingleProjectMatch()); return (replacedPath != null) ? template.setPath(replacedPath) : template; @@ -774,7 +789,7 @@ try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { - repLog.error("error posting event", e); + repLog.atSevere().withCause(e).log("error posting event"); } } } @@ -788,7 +803,7 @@ try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { - repLog.error("error posting event", e); + repLog.atSevere().withCause(e).log("error posting event"); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java index 4050c9c..9b6d431 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.flogger.FluentLogger; @@ -54,6 +56,11 @@ continue; } + if (!c.getFetchRefSpecs().isEmpty()) { + repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName()); + continue; + } + // If destination for push is not set assume equal to source. for (RefSpec ref : c.getPushRefSpecs()) { if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java index eaf5b27..747c0f6 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -107,7 +107,7 @@ try { uri = new URIish(url); } catch (URISyntaxException e) { - repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage()); + repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage()); continue; } @@ -115,13 +115,14 @@ String path = replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch()); if (path == null) { - repLog.warn("adminURL {} does not contain ${name}", uri); + repLog.atWarning().log("adminURL %s does not contain ${name}", uri); continue; } uri = uri.setPath(path); if (!isSSH(uri)) { - repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri); + repLog.atWarning().log( + "adminURL '%s' is invalid: only SSH and HTTP are supported", uri); continue; } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java index 4cc9974..5b24bf5 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@ } @Override + public int getDistributionInterval() { + return replicationConfig.getDistributionInterval(); + } + + @Override public String getVersion() { Hasher hasher = Hashing.murmur3_128().newHasher(); hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java index 66130f9..fa81dd0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -62,34 +62,34 @@ @Override public boolean createProject(Project.NameKey project, String head) { - repLog.info("Creating project {} on {}", project, uri); + repLog.atInfo().log("Creating project %s on %s", project, uri); String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get())); try { return httpClient .execute(new HttpPut(url), new HttpResponseHandler(), getContext()) .isSuccessful(); } catch (IOException e) { - repLog.error("Couldn't perform project creation on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform project creation on %s", uri); return false; } } @Override public boolean deleteProject(Project.NameKey project) { - repLog.info("Deleting project {} on {}", project, uri); + repLog.atInfo().log("Deleting project %s on %s", project, uri); String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get())); try { httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext()); return true; } catch (IOException e) { - repLog.error("Couldn't perform project deletion on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform project deletion on %s", uri); } return false; } @Override public boolean updateHead(Project.NameKey project, String newHead) { - repLog.info("Updating head of {} on {}", project, uri); + repLog.atInfo().log("Updating head of %s on %s", project, uri); String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get())); try { HttpPut req = new HttpPut(url); @@ -98,7 +98,7 @@ httpClient.execute(req, new HttpResponseHandler(), getContext()); return true; } catch (IOException e) { - repLog.error("Couldn't perform update head on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform update head on %s", uri); } return false; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java index da960e6..b092363 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -43,9 +43,9 @@ u.disableRefLog(); u.link(head); } - repLog.info("Created local repository: {}", uri); + repLog.atInfo().log("Created local repository: %s", uri); } catch (IOException e) { - repLog.error("Error creating local repository {}", uri.getPath(), e); + repLog.atSevere().withCause(e).log("Error creating local repository %s", uri.getPath()); return false; } return true; @@ -55,9 +55,9 @@ public boolean deleteProject(Project.NameKey project) { try { recursivelyDelete(new File(uri.getPath())); - repLog.info("Deleted local repository: {}", uri); + repLog.atInfo().log("Deleted local repository: %s", uri); } catch (IOException e) { - repLog.error("Error deleting local repository {}:\n", uri.getPath(), e); + repLog.atSevere().withCause(e).log("Error deleting local repository %s:\n", uri.getPath()); return false; } return true; @@ -71,7 +71,8 @@ u.link(newHead); } } catch (IOException e) { - repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e); + repLog.atSevere().withCause(e).log( + "Failed to update HEAD of repository %s to %s", uri.getPath(), newHead); return false; } return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index e63a350..ebc8889 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.common.flogger.LazyArgs.lazy; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -37,6 +38,7 @@ import com.google.gerrit.server.git.ProjectRunnable; import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning; import com.google.gerrit.server.ioutil.HexFormat; +import com.google.gerrit.server.logging.TraceContext; import com.google.gerrit.server.permissions.PermissionBackend; import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions; import com.google.gerrit.server.permissions.PermissionBackendException; @@ -56,6 +58,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.errors.NoRemoteRepositoryException; @@ -76,7 +79,6 @@ import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.Transport; import org.eclipse.jgit.transport.URIish; -import org.slf4j.MDC; /** * A push to remote operation started by {@link GitReferenceUpdatedListener}. @@ -87,7 +89,7 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning, UriUpdates { private final ReplicationStateListener stateLog; static final String ALL_REFS = "..all.."; - static final String ID_MDC_KEY = "pushOneId"; + static final String ID_KEY = "pushOneId"; // The string here needs to match the one returned by Git(versions prior to 2014) server. // See: @@ -180,17 +182,16 @@ @Override public void cancel() { - repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI()); + repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI()); canceledByReplication(); pool.pushWasCanceled(this); } @Override public void setCanceledWhileRunning() { - repLog.info( - "Replication [{}] to {} was canceled while being executed", - HexFormat.fromInt(id), - getURI()); + repLog.atInfo().log( + "Replication [%s] to %s was canceled while being executed", + HexFormat.fromInt(id), getURI()); canceledWhileRunning.set(true); } @@ -229,7 +230,7 @@ return maxRetries == 0 || retryCount <= maxRetries; } - private void retryDone() { + void retryDone() { this.retrying = false; } @@ -250,9 +251,9 @@ if (ALL_REFS.equals(ref)) { delta.clear(); pushAllRefs = true; - repLog.trace("Added all refs for replication to {}", uri); + repLog.atFinest().log("Added all refs for replication to %s", uri); } else if (!pushAllRefs && delta.add(ref)) { - repLog.trace("Added ref {} for replication to {}", ref, uri); + repLog.atFinest().log("Added ref %s for replication to %s", ref, uri); } } @@ -330,28 +331,33 @@ } private void runPushOperation() { + try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) { + doRunPushOperation(); + } + } + + private void doRunPushOperation() { // Lock the queue, and remove ourselves, so we can't be modified once // we start replication (instead a new instance, with the same URI, is // created and scheduled for a future point in time.) // - MDC.put(ID_MDC_KEY, HexFormat.fromInt(id)); RunwayStatus status = pool.requestRunway(this); isCollision = false; if (!status.isAllowed()) { if (status.isCanceled()) { - repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri); + repLog.atInfo().log( + "PushOp for replication to %s was canceled and thus won't be rescheduled", uri); } else { - repLog.info( - "Rescheduling replication to {} to avoid collision with the in-flight push [{}].", - uri, - HexFormat.fromInt(status.getInFlightPushId())); + repLog.atInfo().log( + "Rescheduling replication to %s to avoid collision with the in-flight push [%s].", + uri, HexFormat.fromInt(status.getInFlightPushId())); pool.reschedule(this, Destination.RetryReason.COLLISION); isCollision = true; } return; } - repLog.info("Replication to {} started...", uri); + repLog.atInfo().log("Replication to %s started...", uri); Timer1.Context<String> destinationContext = metrics.start(config.getName()); try { long startedAt = destinationContext.getStartTime(); @@ -366,12 +372,9 @@ config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed); } retryDone(); - repLog.info( - "Replication to {} completed in {}ms, {}ms delay, {} retries", - uri, - elapsed, - delay, - retryCount); + repLog.atInfo().log( + "Replication to %s completed in %dms, %dms delay, %d retries", + uri, elapsed, delay, retryCount); } catch (RepositoryNotFoundException e) { stateLog.error( "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(), @@ -388,7 +391,7 @@ || msg.contains("unavailable")) { createRepository(); } else { - repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg); + repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg); } } catch (NoRemoteRepositoryException e) { @@ -398,10 +401,10 @@ } catch (TransportException e) { Throwable cause = e.getCause(); if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) { - repLog.error("Cannot replicate to {}: {}", uri, cause.getMessage()); + repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage()); } else if (e instanceof UpdateRefFailureException) { updateRefRetryCount++; - repLog.error("Cannot replicate to {} due to a lock or write ref failure", uri); + repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri); // The remote push operation should be retried. if (updateRefRetryCount <= maxUpdateRefRetries) { @@ -412,17 +415,15 @@ } } else { retryDone(); - repLog.error( - "Giving up after {} '{}' failures during replication to {}", - updateRefRetryCount, - e.getMessage(), - uri); + repLog.atSevere().log( + "Giving up after %d '%s' failures during replication to %s", + updateRefRetryCount, e.getMessage(), uri); } } else { if (canceledWhileRunning.get()) { logCanceledWhileRunningException(e); } else { - repLog.error("Cannot replicate to {}", uri, e); + repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri); // The remote push operation should be retried. pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR); } @@ -440,7 +441,7 @@ } private void logCanceledWhileRunningException(TransportException e) { - repLog.info("Cannot replicate to {}. It was canceled while running", uri, e); + repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri); } private void createRepository() { @@ -448,10 +449,11 @@ try { Ref head = git.exactRef(Constants.HEAD); if (createProject(projectName, head != null ? getName(head) : null)) { - repLog.warn("Missing repository created; retry replication to {}", uri); + repLog.atWarning().log("Missing repository created; retry replication to %s", uri); pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING); } else { - repLog.warn("Missing repository could not be created when replicating {}", uri); + repLog.atWarning().log( + "Missing repository could not be created when replicating %s", uri); } } catch (IOException ioe) { stateLog.error( @@ -498,14 +500,14 @@ } if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) { - repLog.info("Push to {} references: {}", uri, refUpdatesForLogging(todo)); + repLog.atInfo().log("Push to %s references: %s", uri, lazy(() -> refUpdatesForLogging(todo))); } else { - repLog.info( - "Push to {} references (first {} of {} listed): {}", + repLog.atInfo().log( + "Push to %s references (first %d of %d listed): %s", uri, replConfig.getMaxRefsToLog(), todo.size(), - refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))); + lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())))); } return tn.push(NullProgressMonitor.INSTANCE, todo); @@ -540,8 +542,8 @@ private List<RemoteRefUpdate> generateUpdates(Transport tn) throws IOException, PermissionBackendException { - ProjectState projectState = projectCache.checkedGet(projectName); - if (projectState == null) { + Optional<ProjectState> projectState = projectCache.get(projectName); + if (!projectState.isPresent()) { return Collections.emptyList(); } @@ -550,7 +552,7 @@ boolean filter; PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName); try { - projectState.checkStatePermitsRead(); + projectState.get().checkStatePermitsRead(); forProject.check(ProjectPermission.READ); filter = false; } catch (AuthException | ResourceConflictException e) { @@ -570,7 +572,11 @@ } local = n; } - local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build()); + local = + forProject + .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build()) + .stream() + .collect(toMap(Ref::getName, r -> r)); } List<RemoteRefUpdate> remoteUpdatesList = @@ -587,7 +593,7 @@ Map<String, Ref> remote = listRemote(tn); for (Ref src : local.values()) { if (!canPushRef(src.getName(), noPerms)) { - repLog.debug("Skipping push of ref {}", src.getName()); + repLog.atFine().log("Skipping push of ref %s", src.getName()); continue; } @@ -604,7 +610,7 @@ if (config.isMirror()) { for (Ref ref : remote.values()) { if (Constants.HEAD.equals(ref.getName())) { - repLog.debug("Skipping deletion of {}", ref.getName()); + repLog.atFine().log("Skipping deletion of %s", ref.getName()); continue; } RefSpec spec = matchDst(ref.getName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java index 7538298..f96c157 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -42,17 +42,14 @@ OutputStream errStream = sshHelper.newErrorBufferStream(); try { sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Created remote repository: {}", uri); + repLog.atInfo().log("Created remote repository: %s", uri); } catch (IOException e) { - repLog.error( - "Error creating remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error creating remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream); return false; } return true; @@ -65,17 +62,14 @@ OutputStream errStream = sshHelper.newErrorBufferStream(); try { sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Deleted remote repository: {}", uri); + repLog.atInfo().log("Deleted remote repository: %s", uri); } catch (IOException e) { - repLog.error( - "Error deleting remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error deleting remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream); return false; } return true; @@ -90,16 +84,12 @@ try { sshHelper.executeRemoteSsh(uri, cmd, errStream); } catch (IOException e) { - repLog.error( - "Error updating HEAD of remote repository at {} to {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - newHead, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error updating HEAD of remote repository at %s to %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, newHead, e, cmd, errStream); return false; } return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index b978952..8bbb180 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@ boolean isDefaultForceUpdate(); /** + * Returns the interval in seconds for running task distribution. + * + * @return number of seconds, zero if never. + */ + int getDistributionInterval(); + + /** * Returns the maximum number of ref-specs to log into the replication_log whenever a push * operation is completed against a replication end. *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index e99f6b1..2631cbe 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -47,9 +47,9 @@ try { config.load(); } catch (ConfigInvalidException e) { - repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e); + repLog.atSevere().withCause(e).log("Config file %s is invalid: %s", cfgPath, e.getMessage()); } catch (IOException e) { - repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e); + repLog.atSevere().withCause(e).log("Cannot read %s: %s", cfgPath, e.getMessage()); } this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); @@ -86,6 +86,11 @@ } @Override + public int getDistributionInterval() { + return config.getInt("replication", "distributionInterval", 0); + } + + @Override public int getMaxRefsToLog() { return maxRefsToLog; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java index fed09f9..60a9523 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
@@ -28,6 +28,6 @@ systemLog, serverInfo, ReplicationQueue.REPLICATION_LOG_NAME, - new PatternLayout("[%d] [%X{" + PushOne.ID_MDC_KEY + "}] %m%n")); + new PatternLayout("[%d] %m%n")); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 4625407..6af4156 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static java.util.concurrent.TimeUnit.SECONDS; + import com.google.auto.value.AutoValue; import com.google.common.collect.Queues; import com.google.gerrit.common.UsedAt; @@ -25,6 +27,7 @@ import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.util.logging.NamedFluentLogger; import com.google.inject.Inject; import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; @@ -35,9 +38,9 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Manages automatic replication to remote repositories. */ public class ReplicationQueue @@ -47,10 +50,11 @@ ProjectDeletedListener, HeadUpdatedListener { static final String REPLICATION_LOG_NAME = "replication_log"; - static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME); + static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME); private final ReplicationStateListener stateLog; + private final ReplicationConfig replConfig; private final WorkQueue workQueue; private final DynamicItem<EventDispatcher> dispatcher; private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency @@ -58,14 +62,17 @@ private volatile boolean running; private volatile boolean replaying; private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; + private Distributor distributor; @Inject ReplicationQueue( + ReplicationConfig rc, WorkQueue wq, Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, ReplicationTasksStorage rts) { + replConfig = rc; workQueue = wq; dispatcher = dis; destinations = rd; @@ -84,15 +91,17 @@ t.setDaemon(true); t.start(); fireBeforeStartupEvents(); + distributor = new Distributor(workQueue); } } @Override public void stop() { running = false; + distributor.stop(); int discarded = destinations.get().shutdown(); if (discarded > 0) { - repLog.warn("Canceled {} replication events during shutdown", discarded); + repLog.atWarning().log("Canceled %d replication events during shutdown", discarded); } } @@ -172,7 +181,7 @@ cfg.schedule(project, refName, uri, state, now); } } else { - repLog.debug("Skipping ref {} on project {}", refName, project.get()); + repLog.atFine().log("Skipping ref %s on project %s", refName, project.get()); } if (withoutState) { state.markAllPushTasksScheduled(); @@ -185,22 +194,46 @@ replaying = true; for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { if (t == null) { - repLog.warn("Encountered null replication event in ReplicationTasksStorage"); + repLog.atWarning().log("Encountered null replication event in ReplicationTasksStorage"); continue; } try { fire(new URIish(t.uri), Project.nameKey(t.project), t.ref); } catch (URISyntaxException e) { - repLog.error("Encountered malformed URI for persisted event %s", t); + repLog.atSevere().withCause(e).log("Encountered malformed URI for persisted event %s", t); } } } catch (Throwable e) { - repLog.error("Unexpected error while firing pending events", e); + repLog.atSevere().withCause(e).log("Unexpected error while firing pending events"); } finally { replaying = false; } } + private void pruneCompleted() { + // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. + // We also cannot access them by taskId since PushOnes don't have a taskId, they do have + // and Id, but it not the id assigned to the task in the queues. The tasks in the queue + // do use the same name as returned by toString() though, so that be used to correlate + // PushOnes with queue tasks despite their wrappers. + Set<String> prunableTaskNames = new HashSet<>(); + for (Destination destination : destinations.get().getAll(FilterType.ALL)) { + prunableTaskNames.addAll(destination.getPrunableTaskNames()); + } + + for (WorkQueue.Task<?> task : workQueue.getTasks()) { + WorkQueue.Task.State state = task.getState(); + if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) { + if (task instanceof WorkQueue.ProjectTask) { + if (prunableTaskNames.contains(task.toString())) { + repLog.atFine().log("Pruning externally completed task: %s", task); + task.cancel(false); + } + } + } + } + } + @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey p = Project.nameKey(event.getProjectName()); @@ -220,7 +253,7 @@ for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) { String eventKey = String.format("%s:%s", event.projectName(), event.refName()); if (!eventsReplayed.contains(eventKey)) { - repLog.info("Firing pending task {}", event); + repLog.atInfo().log("Firing pending task %s", event); fire(event.projectName(), event.refName()); eventsReplayed.add(eventKey); } @@ -238,4 +271,49 @@ public abstract String refName(); } + + protected class Distributor implements WorkQueue.CancelableRunnable { + public ScheduledThreadPoolExecutor executor; + public ScheduledFuture<?> future; + + public Distributor(WorkQueue wq) { + int distributionInterval = replConfig.getDistributionInterval(); + if (distributionInterval > 0) { + executor = wq.createQueue(1, "Replication Distribution", false); + future = + executor.scheduleWithFixedDelay( + this, distributionInterval, distributionInterval, SECONDS); + } + } + + @Override + public void run() { + if (!running) { + return; + } + try { + firePendingEvents(); + pruneCompleted(); + } catch (Exception e) { + repLog.atSevere().withCause(e).log("error distributing tasks"); + } + } + + @Override + public void cancel() { + future.cancel(true); + } + + public void stop() { + if (executor != null) { + cancel(); + executor.getQueue().remove(this); + } + } + + @Override + public String toString() { + return "Replication Distributor"; + } + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java index f2d55de..3e73033 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@ @Override public void warn(String msg, ReplicationState... states) { stateWriteErr("Warning: " + msg, states); - repLog.warn(msg); + repLog.atWarning().log(msg); } @Override public void error(String msg, ReplicationState... states) { stateWriteErr("Error: " + msg, states); - repLog.error(msg); + repLog.atSevere().log(msg); } @Override public void error(String msg, Throwable t, ReplicationState... states) { stateWriteErr("Error: " + msg, states); - repLog.error(msg, t); + repLog.atSevere().withCause(t).log(msg); } private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index ead218b..35ecec6 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -48,9 +48,9 @@ * task: * * <p><code> - * .../building/<tmp_name> new replication tasks under construction - * .../running/<sha1> running replication tasks - * .../waiting/<sha1> outstanding replication tasks + * .../building/<tmp_name> new replication tasks under construction + * .../running/<sha1> running replication tasks + * .../waiting/<task_sha1> outstanding replication tasks * </code> * * <p>Tasks are moved atomically via a rename between those directories to indicate the current @@ -114,12 +114,18 @@ } public synchronized void resetAll() { - for (ReplicateRefUpdate r : listRunning()) { + for (ReplicateRefUpdate r : list(createDir(runningUpdates))) { new Task(r).reset(); } } - public synchronized void finish(UriUpdates uriUpdates) { + public boolean isWaiting(UriUpdates uriUpdates) { + return uriUpdates.getReplicateRefUpdates().stream() + .map(update -> new Task(update)) + .anyMatch(Task::isWaiting); + } + + public void finish(UriUpdates uriUpdates) { for (ReplicateRefUpdate update : uriUpdates.getReplicateRefUpdates()) { new Task(update).finish(); } @@ -179,14 +185,12 @@ @VisibleForTesting class Task { public final ReplicateRefUpdate update; - public final String json; public final String taskKey; public final Path running; public final Path waiting; public Task(ReplicateRefUpdate update) { this.update = update; - json = GSON.toJson(update) + "\n"; String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote; taskKey = sha1(key).name(); running = createDir(runningUpdates).resolve(taskKey); @@ -198,6 +202,7 @@ return taskKey; } + String json = GSON.toJson(update) + "\n"; try { Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null); logger.atFine().log("CREATE %s %s", tmp, updateLog()); @@ -218,6 +223,10 @@ rename(running, waiting); } + public boolean isWaiting() { + return Files.exists(waiting); + } + public void finish() { try { logger.atFine().log("DELETE %s %s", running, updateLog());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java index ffa6be1..fdbd5e7 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -58,7 +58,8 @@ return; } - repLog.warn("Cannot update HEAD of project {} on remote site {}.", project, replicateURI); + repLog.atWarning().log( + "Cannot update HEAD of project %s on remote site %s.", project, replicateURI); } @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 06f5f48..d606b7b 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@ : Timeout for SSH connections. If 0, there is no timeout and the client waits indefinitely. By default, 2 minutes. +replication.distributionInterval +: Interval in seconds for running the replication distributor. When + run, the replication distributor will add all persisted waiting tasks + to the queue to ensure that externally loaded tasks are visible to + the current process. If zero, turn off the replication distributor. By + default, zero. + + Turning this on is likely only useful when there are other processes + (such as other masters in the same cluster) writing to the same + persistence store. To ensure that updates are seen well before their + replicationDelay expires when the distributor is used, the recommended + value for this is approximately the smallest remote.NAME.replicationDelay + divided by 5. + <a name="replication.updateRefErrorMaxRetries">replication.updateRefErrorMaxRetries</a> : Number of times to retry a replication operation if an update ref error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index c09fcd1..94f0dc4 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -245,9 +246,9 @@ return push; } - private void setupProjectCacheMock() throws IOException { + private void setupProjectCacheMock() { projectCacheMock = mock(ProjectCache.class); - when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock); + when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock)); } private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java index efacae7..79b05cc 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@ assertThatContainsDestination(destinations, remoteName1, remoteUrl1); assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } + + @Test + public void shouldSkipFetchRefSpecs() throws Exception { + FileBasedConfig config = newReplicationConfig(); + String pushRemote = "pushRemote"; + final String aRemoteURL = "ssh://somewhere/${name}.git"; + config.setString("remote", pushRemote, "url", aRemoteURL); + + String fetchRemote = "fetchRemote"; + config.setString("remote", fetchRemote, "url", aRemoteURL); + config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*"); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(newReplicationFileBasedConfig()); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index ce87a13..fdea8d0 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -46,9 +46,10 @@ name = "replication", sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") public class ReplicationIT extends ReplicationDaemon { + private static final int TEST_REPLICATION_DELAY = 1; + private static final int TEST_REPLICATION_RETRY = 1; private static final Duration TEST_TIMEOUT = - Duration.ofSeconds( - (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + 1); + Duration.ofSeconds((TEST_REPLICATION_DELAY + TEST_REPLICATION_RETRY * 60) + 1); @Inject private DynamicSet<ProjectDeletedListener> deletedListeners;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java index 67c04b2..31a4f49 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationStorageIT.java
@@ -22,8 +22,13 @@ import com.google.gerrit.acceptance.UseLocalDisk; import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.api.projects.BranchInput; +import com.googlesource.gerrit.plugins.replication.Destination.QueueInfo; +import com.googlesource.gerrit.plugins.replication.ReplicationConfig.FilterType; import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate; +import java.io.IOException; import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; @@ -32,6 +37,7 @@ import java.util.Optional; import java.util.regex.Pattern; import java.util.stream.Stream; +import org.eclipse.jgit.transport.URIish; import org.junit.Test; /** @@ -49,7 +55,14 @@ protected static final int TEST_REPLICATION_MAX_RETRIES = 1; protected static final Duration TEST_TASK_FINISH_TIMEOUT = Duration.ofSeconds(TEST_TASK_FINISH_SECONDS); + private static final Duration MAX_RETRY_WITH_TOLERANCE_TIMEOUT = + Duration.ofSeconds( + (TEST_REPLICATION_DELAY_SECONDS + TEST_REPLICATION_RETRY_MINUTES * 60) + * TEST_REPLICATION_MAX_RETRIES + + 10); protected ReplicationTasksStorage tasksStorage; + private DestinationsCollection destinationCollection; + private ReplicationConfig replicationConfig; @Override public void setUpTestPlugin() throws Exception { @@ -60,6 +73,8 @@ Optional.of("not-used-project")); // Simulates a full replication.config initialization super.setUpTestPlugin(); tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); + destinationCollection = plugin.getSysInjector().getInstance(DestinationsCollection.class); + replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); } @Test @@ -270,6 +285,53 @@ WaitUtil.waitUntil(() -> tasksStorage.listRunning().size() == 0, TEST_TASK_FINISH_TIMEOUT); } + @Test + public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception { + setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS); + config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0); + config.save(); + reloadConfig(); + assertThat(tasksStorage.listRunning()).hasSize(0); + Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project"); + + WaitUtil.waitUntil( + () -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git")), + TEST_NEW_PROJECT_TIMEOUT); + WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT); + } + + @Test + public void shouldCleanupBothTasksAndLocksAfterReplicationCancelledAfterMaxRetries() + throws Exception { + String projectName = "task_cleanup_locks_project_cancelled"; + String remoteDestination = "http://invalidurl:9090/"; + URIish urish = new URIish(remoteDestination + projectName + ".git"); + + setReplicationDestination(projectName, "replica", Optional.of(projectName)); + // replace correct urls with invalid one to trigger retry + config.setString("remote", projectName, "url", remoteDestination + "${name}.git"); + config.setInt("remote", projectName, "replicationMaxRetries", TEST_REPLICATION_MAX_RETRIES); + config.save(); + reloadConfig(); + Destination destination = + destinationCollection.getAll(FilterType.ALL).stream() + .filter(dest -> dest.getProjects().contains(projectName)) + .findFirst() + .get(); + + createTestProject(projectName); + + WaitUtil.waitUntil( + () -> isTaskRescheduled(destination.getQueueInfo(), urish), TEST_NEW_PROJECT_TIMEOUT); + // replicationRetry is set to 1 minute which is the minimum value. That's why + // should be safe to get the pushOne object from pending because it should be + // here for one minute + PushOne pushOp = destination.getQueueInfo().pending.get(urish); + + WaitUtil.waitUntil(() -> pushOp.wasCanceled(), MAX_RETRY_WITH_TOLERANCE_TIMEOUT); + WaitUtil.waitUntil(() -> isTaskCleanedUp(), TEST_TASK_FINISH_TIMEOUT); + } + private void replicateBranchDeletion(boolean mirror) throws Exception { setReplicationDestination("foo", "replica", ALL_PROJECTS); reloadConfig(); @@ -290,6 +352,21 @@ assertThat(listWaitingReplicationTasks(branchToDelete)).hasSize(1); } + private boolean isTaskRescheduled(QueueInfo queue, URIish uri) { + PushOne pushOne = queue.pending.get(uri); + return pushOne == null ? false : pushOne.isRetrying(); + } + + private boolean isTaskCleanedUp() { + Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates"); + Path runningUpdates = refUpdates.resolve("running"); + try { + return Files.list(runningUpdates).count() == 0; + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + private Stream<ReplicateRefUpdate> waitingChangeReplicationTasksForRemote( String changeRef, String remote) { return tasksStorage.listWaiting().stream()
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java index 38c2905..d9fbbe5 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTaskTest.java
@@ -347,11 +347,11 @@ } protected static void assertIsWaiting(Task task) { - assertTrue(whiteBoxIsWaiting(task)); + assertTrue(task.isWaiting()); } protected static void assertNotWaiting(Task task) { - assertFalse(whiteBoxIsWaiting(task)); + assertFalse(task.isWaiting()); } protected static void assertIsRunning(Task task) { @@ -366,10 +366,6 @@ return Files.exists(task.running); } - private static boolean whiteBoxIsWaiting(Task task) { - return Files.exists(task.waiting); - } - public static URIish getUrish(String uri) { try { return new URIish(uri);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java index 97992c8..141f739 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorageTest.java
@@ -16,6 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; @@ -71,10 +72,20 @@ } @Test + public void canCheckIfUpdateIsWaiting() { + storage.create(REF_UPDATE); + assertTrue(storage.isWaiting(uriUpdates)); + + storage.start(uriUpdates); + assertFalse(storage.isWaiting(uriUpdates)); + } + + @Test public void canStartWaitingUpdate() throws Exception { storage.create(REF_UPDATE); storage.start(uriUpdates); assertThat(storage.listWaiting()).isEmpty(); + assertFalse(storage.isWaiting(uriUpdates)); assertContainsExactly(storage.listRunning(), REF_UPDATE); } @@ -128,6 +139,8 @@ String keyA = storage.create(REF_UPDATE); String keyB = storage.create(updateB); assertThat(storage.listWaiting()).hasSize(2); + assertTrue(storage.isWaiting(uriUpdates)); + assertTrue(storage.isWaiting(TestUriUpdates.create(updateB))); assertNotEquals(keyA, keyB); } @@ -187,6 +200,8 @@ storage.create(REF_UPDATE); storage.create(updateB); assertThat(storage.listWaiting()).hasSize(2); + assertTrue(storage.isWaiting(uriUpdates)); + assertTrue(storage.isWaiting(TestUriUpdates.create(updateB))); } @Test @@ -198,6 +213,8 @@ String keyB = storage.create(refB); assertThat(storage.listWaiting()).hasSize(2); assertNotEquals(keyA, keyB); + assertTrue(storage.isWaiting(TestUriUpdates.create(refA))); + assertTrue(storage.isWaiting(TestUriUpdates.create(refB))); } @Test @@ -236,6 +253,7 @@ storage.start(uriUpdates); assertContainsExactly(storage.listRunning(), REF_UPDATE); + assertFalse(storage.isWaiting(uriUpdates)); assertThat(storage.listWaiting()).isEmpty(); storage.finish(uriUpdates); @@ -255,6 +273,7 @@ storage.resetAll(); assertContainsExactly(storage.listWaiting(), REF_UPDATE); + assertTrue(storage.isWaiting(uriUpdates)); assertThat(storage.listRunning()).isEmpty(); } @@ -267,6 +286,7 @@ storage.start(uriUpdates); assertContainsExactly(storage.listRunning(), REF_UPDATE); assertThat(storage.listWaiting()).isEmpty(); + assertFalse(storage.isWaiting(uriUpdates)); storage.finish(uriUpdates); assertNoIncompleteTasks(storage);