Merge branch 'stable-3.1' * stable-3.1: Make SecureCredentialsFactory public Change-Id: I23ee1cc85100066704ba2157286654da38b0c3c6
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java index fe5dbad..782ff4f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -67,6 +67,11 @@ } @Override + public int getDistributionInterval() { + return currentConfig.getDistributionInterval(); + } + + @Override public synchronized int getMaxRefsToLog() { return currentConfig.getMaxRefsToLog(); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java index fa26e82..424648e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/CreateProjectTask.java
@@ -63,7 +63,8 @@ return true; } - repLog.warn("Cannot create new project {} on remote site {}.", projectName, replicateURI); + repLog.atWarning().log( + "Cannot create new project %s on remote site %s.", projectName, replicateURI); return false; } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java index 4617672..8ea7227 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DeleteProjectTask.java
@@ -55,7 +55,7 @@ return; } - repLog.warn("Cannot delete project {} on remote site {}.", project, replicateURI); + repLog.atWarning().log("Cannot delete project %s on remote site %s.", project, replicateURI); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java index 85c3917..6907ad0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.gerrit.server.project.ProjectCache.noSuchProject; import static com.googlesource.gerrit.plugins.replication.PushResultProcessing.resolveNodeName; import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName; import static org.eclipse.jgit.transport.RemoteRefUpdate.Status.NON_EXISTING; @@ -30,6 +31,7 @@ import com.google.gerrit.entities.BranchNameKey; import com.google.gerrit.entities.Project; import com.google.gerrit.entities.RefNames; +import com.google.gerrit.exceptions.StorageException; import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.extensions.restapi.AuthException; @@ -51,6 +53,7 @@ import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.util.RequestContext; +import com.google.gerrit.util.logging.NamedFluentLogger; import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Provider; @@ -64,10 +67,13 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -81,10 +87,9 @@ import org.eclipse.jgit.transport.RemoteConfig; import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; public class Destination { - private static final Logger repLog = ReplicationQueue.repLog; + private static final NamedFluentLogger repLog = ReplicationQueue.repLog; private static final String PROJECT_NOT_AVAILABLE = "source project %s not available"; @@ -94,7 +99,9 @@ private final ReplicationStateListener stateLog; private final Object stateLock = new Object(); - private final Map<URIish, PushOne> pending = new HashMap<>(); + // writes are covered by the stateLock, but some reads are still + // allowed without the lock + private final ConcurrentMap<URIish, PushOne> pending = new ConcurrentHashMap<>(); private final Map<URIish, PushOne> inFlight = new HashMap<>(); private final PushOne.Factory opFactory; private final DeleteProjectTask.Factory deleteProjectFactory; @@ -156,7 +163,7 @@ builder.add(g.getUUID()); addRecursiveParents(g.getUUID(), builder, groupIncludeCache); } else { - repLog.warn("Group \"{}\" not recognized, removing from authGroup", name); + repLog.atWarning().log("Group \"%s\" not recognized, removing from authGroup", name); } } remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build())); @@ -240,11 +247,9 @@ int numInFlight = inFlight.size(); if (numPending > 0 || numInFlight > 0) { - repLog.warn( - "Cancelling replication events (pending={}, inFlight={}) for destination {}", - numPending, - numInFlight, - getRemoteConfigName()); + repLog.atWarning().log( + "Cancelling replication events (pending=%d, inFlight=%d) for destination %s", + numPending, numInFlight, getRemoteConfigName()); foreachPushOp( pending, @@ -280,7 +285,8 @@ if (!config.replicateHiddenProjects() && state.getProject().getState() == com.google.gerrit.extensions.client.ProjectState.HIDDEN) { - repLog.debug("Project {} is hidden and replication of hidden projects is disabled", name); + repLog.atFine().log( + "Project %s is hidden and replication of hidden projects is disabled", name); return false; } @@ -293,10 +299,9 @@ permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck); return true; } catch (AuthException e) { - repLog.debug( - "Project {} is not visible to current user {}", - name, - user.getUserName().orElse("unknown")); + repLog.atFine().log( + "Project %s is not visible to current user %s", + name, user.getUserName().orElse("unknown")); return false; } } @@ -309,21 +314,22 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { - repLog.warn("Error reading project {} from cache", project, e); + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { + repLog.atWarning().withCause(e).log( + "Error reading project %s from cache", project); return false; } if (projectState == null) { - repLog.debug("Project {} does not exist", project); + repLog.atFine().log("Project %s does not exist", project); throw new NoSuchProjectException(project); } if (!projectState.statePermitsRead()) { - repLog.debug("Project {} does not permit read", project); + repLog.atFine().log("Project %s does not permit read", project); return false; } if (!shouldReplicate(projectState, userProvider.get())) { - repLog.debug("Project {} should not be replicated", project); + repLog.atFine().log("Project %s should not be replicated", project); return false; } if (PushOne.ALL_REFS.equals(ref)) { @@ -336,11 +342,9 @@ .ref(ref) .check(RefPermission.READ); } catch (AuthException e) { - repLog.debug( - "Ref {} on project {} is not visible to calling user {}", - ref, - project, - userProvider.get().getUserName().orElse("unknown")); + repLog.atFine().log( + "Ref %s on project %s is not visible to calling user %s", + ref, project, userProvider.get().getUserName().orElse("unknown")); return false; } return true; @@ -362,13 +366,10 @@ () -> { ProjectState projectState; try { - projectState = projectCache.checkedGet(project); - } catch (IOException e) { + projectState = projectCache.get(project).orElseThrow(noSuchProject(project)); + } catch (StorageException e) { return false; } - if (projectState == null) { - throw new NoSuchProjectException(project); - } return shouldReplicate(projectState, userProvider.get()); }) .call(); @@ -388,10 +389,10 @@ void schedule( Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) { if (!shouldReplicate(project, ref, state)) { - repLog.debug("Not scheduling replication {}:{} => {}", project, ref, uri); + repLog.atFine().log("Not scheduling replication %s:%s => %s", project, ref, uri); return; } - repLog.info("scheduling replication {}:{} => {}", project, ref, uri); + repLog.atInfo().log("scheduling replication %s:%s => %s", project, ref, uri); if (!config.replicatePermissions()) { PushOne e; @@ -433,7 +434,8 @@ task.addState(ref, state); } state.increasePushTaskCount(project.get(), ref); - repLog.info("scheduled {}:{} => {} to run after {}s", project, ref, task, config.getDelay()); + repLog.atInfo().log( + "scheduled %s:%s => %s to run after %ds", project, ref, task, config.getDelay()); } } @@ -580,7 +582,9 @@ if (inFlightOp != null) { return RunwayStatus.denied(inFlightOp.getId()); } - replicationTasksStorage.get().start(op); + if (!replicationTasksStorage.get().start(op)) { + return RunwayStatus.deniedExternal(); + } inFlight.put(op.getURI(), op); } return RunwayStatus.allowed(); @@ -595,9 +599,20 @@ } } + public Set<String> getPrunableTaskNames() { + Set<String> names = new HashSet<>(); + for (PushOne push : pending.values()) { + if (!replicationTasksStorage.get().isWaiting(push)) { + repLog.atFine().log("No longer isWaiting, can prune %s", push.getURI()); + names.add(push.toString()); + } + } + return names; + } + boolean wouldPushProject(Project.NameKey project) { if (!shouldReplicate(project)) { - repLog.debug("Skipping replication of project {}", project.get()); + repLog.atFine().log("Skipping replication of project %s", project.get()); return false; } @@ -609,7 +624,8 @@ boolean matches = (new ReplicationFilter(projects)).matches(project); if (!matches) { - repLog.debug("Skipping replication of project {}; does not match filter", project.get()); + repLog.atFine().log( + "Skipping replication of project %s; does not match filter", project.get()); } return matches; } @@ -620,7 +636,7 @@ boolean wouldPushRef(String ref) { if (!config.replicatePermissions() && RefNames.REFS_CONFIG.equals(ref)) { - repLog.debug("Skipping push of ref {}; it is a meta ref", ref); + repLog.atFine().log("Skipping push of ref %s; it is a meta ref", ref); return false; } if (PushOne.ALL_REFS.equals(ref)) { @@ -631,7 +647,7 @@ return true; } } - repLog.debug("Skipping push of ref {}; it does not match push ref specs", ref); + repLog.atFine().log("Skipping push of ref %s; it does not match push ref specs", ref); return false; } @@ -671,7 +687,7 @@ } else if (remoteNameStyle.equals("basenameOnly")) { name = FilenameUtils.getBaseName(name); } else if (!remoteNameStyle.equals("slash")) { - repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle); + repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle); } String replacedPath = replaceName(template.getPath(), name, isSingleProjectMatch()); return (replacedPath != null) ? template.setPath(replacedPath) : template; @@ -757,7 +773,7 @@ try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { - repLog.error("error posting event", e); + repLog.atSevere().withCause(e).log("error posting event"); } } } @@ -771,7 +787,7 @@ try { eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event); } catch (PermissionBackendException e) { - repLog.error("error posting event", e); + repLog.atSevere().withCause(e).log("error posting event"); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java index 4050c9c..9b6d431 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfigParser.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.flogger.FluentLogger; @@ -54,6 +56,11 @@ continue; } + if (!c.getFetchRefSpecs().isEmpty()) { + repLog.atInfo().log("Ignore '%s' endpoint: not a 'push' target", c.getName()); + continue; + } + // If destination for push is not set assume equal to source. for (RefSpec ref : c.getPushRefSpecs()) { if (ref.getDestination() == null) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java index fcf5dfa..ecfbb8e 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationsCollection.java
@@ -106,7 +106,7 @@ try { uri = new URIish(url); } catch (URISyntaxException e) { - repLog.warn("adminURL '{}' is invalid: {}", url, e.getMessage()); + repLog.atWarning().log("adminURL '%s' is invalid: %s", url, e.getMessage()); continue; } @@ -114,13 +114,14 @@ String path = replaceName(uri.getPath(), projectName.get(), config.isSingleProjectMatch()); if (path == null) { - repLog.warn("adminURL {} does not contain ${name}", uri); + repLog.atWarning().log("adminURL %s does not contain ${name}", uri); continue; } uri = uri.setPath(path); if (!isSSH(uri)) { - repLog.warn("adminURL '{}' is invalid: only SSH and HTTP are supported", uri); + repLog.atWarning().log( + "adminURL '%s' is invalid: only SSH and HTTP are supported", uri); continue; } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java index 4cc9974..5b24bf5 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/FanoutReplicationConfig.java
@@ -152,6 +152,11 @@ } @Override + public int getDistributionInterval() { + return replicationConfig.getDistributionInterval(); + } + + @Override public String getVersion() { Hasher hasher = Hashing.murmur3_128().newHasher(); hasher.putString(replicationConfig.getVersion(), UTF_8);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java index 66130f9..fa81dd0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/GerritRestApi.java
@@ -62,34 +62,34 @@ @Override public boolean createProject(Project.NameKey project, String head) { - repLog.info("Creating project {} on {}", project, uri); + repLog.atInfo().log("Creating project %s on %s", project, uri); String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get())); try { return httpClient .execute(new HttpPut(url), new HttpResponseHandler(), getContext()) .isSuccessful(); } catch (IOException e) { - repLog.error("Couldn't perform project creation on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform project creation on %s", uri); return false; } } @Override public boolean deleteProject(Project.NameKey project) { - repLog.info("Deleting project {} on {}", project, uri); + repLog.atInfo().log("Deleting project %s on %s", project, uri); String url = String.format("%s/a/projects/%s", toHttpUri(uri), Url.encode(project.get())); try { httpClient.execute(new HttpDelete(url), new HttpResponseHandler(), getContext()); return true; } catch (IOException e) { - repLog.error("Couldn't perform project deletion on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform project deletion on %s", uri); } return false; } @Override public boolean updateHead(Project.NameKey project, String newHead) { - repLog.info("Updating head of {} on {}", project, uri); + repLog.atInfo().log("Updating head of %s on %s", project, uri); String url = String.format("%s/a/projects/%s/HEAD", toHttpUri(uri), Url.encode(project.get())); try { HttpPut req = new HttpPut(url); @@ -98,7 +98,7 @@ httpClient.execute(req, new HttpResponseHandler(), getContext()); return true; } catch (IOException e) { - repLog.error("Couldn't perform update head on {}", uri, e); + repLog.atSevere().withCause(e).log("Couldn't perform update head on %s", uri); } return false; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java index da960e6..b092363 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/LocalFS.java
@@ -43,9 +43,9 @@ u.disableRefLog(); u.link(head); } - repLog.info("Created local repository: {}", uri); + repLog.atInfo().log("Created local repository: %s", uri); } catch (IOException e) { - repLog.error("Error creating local repository {}", uri.getPath(), e); + repLog.atSevere().withCause(e).log("Error creating local repository %s", uri.getPath()); return false; } return true; @@ -55,9 +55,9 @@ public boolean deleteProject(Project.NameKey project) { try { recursivelyDelete(new File(uri.getPath())); - repLog.info("Deleted local repository: {}", uri); + repLog.atInfo().log("Deleted local repository: %s", uri); } catch (IOException e) { - repLog.error("Error deleting local repository {}:\n", uri.getPath(), e); + repLog.atSevere().withCause(e).log("Error deleting local repository %s:\n", uri.getPath()); return false; } return true; @@ -71,7 +71,8 @@ u.link(newHead); } } catch (IOException e) { - repLog.error("Failed to update HEAD of repository {} to {}", uri.getPath(), newHead, e); + repLog.atSevere().withCause(e).log( + "Failed to update HEAD of repository %s to %s", uri.getPath(), newHead); return false; } return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java index 634f44d..eff06ce 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -14,6 +14,7 @@ package com.googlesource.gerrit.plugins.replication; +import static com.google.common.flogger.LazyArgs.lazy; import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -37,6 +38,7 @@ import com.google.gerrit.server.git.ProjectRunnable; import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning; import com.google.gerrit.server.ioutil.HexFormat; +import com.google.gerrit.server.logging.TraceContext; import com.google.gerrit.server.permissions.PermissionBackend; import com.google.gerrit.server.permissions.PermissionBackend.RefFilterOptions; import com.google.gerrit.server.permissions.PermissionBackendException; @@ -56,6 +58,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jgit.errors.NoRemoteRepositoryException; @@ -76,7 +79,6 @@ import org.eclipse.jgit.transport.RemoteRefUpdate; import org.eclipse.jgit.transport.Transport; import org.eclipse.jgit.transport.URIish; -import org.slf4j.MDC; /** * A push to remote operation started by {@link GitReferenceUpdatedListener}. @@ -87,7 +89,7 @@ class PushOne implements ProjectRunnable, CanceledWhileRunning { private final ReplicationStateListener stateLog; static final String ALL_REFS = "..all.."; - static final String ID_MDC_KEY = "pushOneId"; + static final String ID_KEY = "pushOneId"; interface Factory { PushOne create(Project.NameKey d, URIish u); @@ -169,17 +171,16 @@ @Override public void cancel() { - repLog.info("Replication [{}] to {} was canceled", HexFormat.fromInt(id), getURI()); + repLog.atInfo().log("Replication [%s] to %s was canceled", HexFormat.fromInt(id), getURI()); canceledByReplication(); pool.pushWasCanceled(this); } @Override public void setCanceledWhileRunning() { - repLog.info( - "Replication [{}] to {} was canceled while being executed", - HexFormat.fromInt(id), - getURI()); + repLog.atInfo().log( + "Replication [%s] to %s was canceled while being executed", + HexFormat.fromInt(id), getURI()); canceledWhileRunning.set(true); } @@ -238,9 +239,10 @@ if (ALL_REFS.equals(ref)) { delta.clear(); pushAllRefs = true; - repLog.trace("Added all refs for replication to {}", uri); + repLog.atFinest().log("Added all refs for replication to %s", uri); } else if (!pushAllRefs && delta.add(ref)) { - repLog.trace("Added ref {} for replication to {}", ref, uri); + delta.add(ref); + repLog.atFinest().log("Added ref %s for replication to %s", ref, uri); } } @@ -317,28 +319,35 @@ } private void runPushOperation() { + try (TraceContext ctx = TraceContext.open().addTag(ID_KEY, HexFormat.fromInt(id))) { + doRunPushOperation(); + } + } + + private void doRunPushOperation() { // Lock the queue, and remove ourselves, so we can't be modified once // we start replication (instead a new instance, with the same URI, is // created and scheduled for a future point in time.) // - MDC.put(ID_MDC_KEY, HexFormat.fromInt(id)); RunwayStatus status = pool.requestRunway(this); isCollision = false; if (!status.isAllowed()) { if (status.isCanceled()) { - repLog.info("PushOp for replication to {} was canceled and thus won't be rescheduled", uri); + repLog.atInfo().log( + "PushOp for replication to %s was canceled and thus won't be rescheduled", uri); + } else if (status.isExternalInflight()) { + repLog.atInfo().log("PushOp for replication to %s was denied externally", uri); } else { - repLog.info( - "Rescheduling replication to {} to avoid collision with the in-flight push [{}].", - uri, - HexFormat.fromInt(status.getInFlightPushId())); + repLog.atInfo().log( + "Rescheduling replication to %s to avoid collision with the in-flight push [%s].", + uri, HexFormat.fromInt(status.getInFlightPushId())); pool.reschedule(this, Destination.RetryReason.COLLISION); isCollision = true; } return; } - repLog.info("Replication to {} started...", uri); + repLog.atInfo().log("Replication to %s started...", uri); Timer1.Context<String> destinationContext = metrics.start(config.getName()); try { long startedAt = destinationContext.getStartTime(); @@ -353,12 +362,9 @@ config.getName(), projectName.get(), pool.getSlowLatencyThreshold(), elapsed); } retryDone(); - repLog.info( - "Replication to {} completed in {}ms, {}ms delay, {} retries", - uri, - elapsed, - delay, - retryCount); + repLog.atInfo().log( + "Replication to %s completed in %dms, %dms delay, %d retries", + uri, elapsed, delay, retryCount); } catch (RepositoryNotFoundException e) { stateLog.error( "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(), @@ -375,7 +381,7 @@ || msg.contains("unavailable")) { createRepository(); } else { - repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg); + repLog.atSevere().log("Cannot replicate %s; Remote repository error: %s", projectName, msg); } } catch (NoRemoteRepositoryException e) { @@ -385,10 +391,10 @@ } catch (TransportException e) { Throwable cause = e.getCause(); if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) { - repLog.error("Cannot replicate to {}: {}", uri, cause.getMessage()); + repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage()); } else if (e instanceof LockFailureException) { lockRetryCount++; - repLog.error("Cannot replicate to {} due to lock failure", uri); + repLog.atSevere().log("Cannot replicate to %s due to lock failure", uri); // The remote push operation should be retried. if (lockRetryCount <= maxLockRetries) { @@ -398,14 +404,14 @@ pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR); } } else { - repLog.error( - "Giving up after {} lock failures during replication to {}", lockRetryCount, uri); + repLog.atSevere().log( + "Giving up after %d lock failures during replication to %s", lockRetryCount, uri); } } else { if (canceledWhileRunning.get()) { logCanceledWhileRunningException(e); } else { - repLog.error("Cannot replicate to {}", uri, e); + repLog.atSevere().withCause(e).log("Cannot replicate to %s", uri); // The remote push operation should be retried. pool.reschedule(this, Destination.RetryReason.TRANSPORT_ERROR); } @@ -423,7 +429,7 @@ } private void logCanceledWhileRunningException(TransportException e) { - repLog.info("Cannot replicate to {}. It was canceled while running", uri, e); + repLog.atInfo().withCause(e).log("Cannot replicate to %s. It was canceled while running", uri); } private void createRepository() { @@ -431,10 +437,11 @@ try { Ref head = git.exactRef(Constants.HEAD); if (createProject(projectName, head != null ? getName(head) : null)) { - repLog.warn("Missing repository created; retry replication to {}", uri); + repLog.atWarning().log("Missing repository created; retry replication to %s", uri); pool.reschedule(this, Destination.RetryReason.REPOSITORY_MISSING); } else { - repLog.warn("Missing repository could not be created when replicating {}", uri); + repLog.atWarning().log( + "Missing repository could not be created when replicating %s", uri); } } catch (IOException ioe) { stateLog.error( @@ -481,14 +488,14 @@ } if (replConfig.getMaxRefsToLog() == 0 || todo.size() <= replConfig.getMaxRefsToLog()) { - repLog.info("Push to {} references: {}", uri, refUpdatesForLogging(todo)); + repLog.atInfo().log("Push to %s references: %s", uri, lazy(() -> refUpdatesForLogging(todo))); } else { - repLog.info( - "Push to {} references (first {} of {} listed): {}", + repLog.atInfo().log( + "Push to %s references (first %d of %d listed): %s", uri, replConfig.getMaxRefsToLog(), todo.size(), - refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog()))); + lazy(() -> refUpdatesForLogging(todo.subList(0, replConfig.getMaxRefsToLog())))); } return tn.push(NullProgressMonitor.INSTANCE, todo); @@ -523,8 +530,8 @@ private List<RemoteRefUpdate> generateUpdates(Transport tn) throws IOException, PermissionBackendException { - ProjectState projectState = projectCache.checkedGet(projectName); - if (projectState == null) { + Optional<ProjectState> projectState = projectCache.get(projectName); + if (!projectState.isPresent()) { return Collections.emptyList(); } @@ -533,7 +540,7 @@ boolean filter; PermissionBackend.ForProject forProject = permissionBackend.currentUser().project(projectName); try { - projectState.checkStatePermitsRead(); + projectState.get().checkStatePermitsRead(); forProject.check(ProjectPermission.READ); filter = false; } catch (AuthException | ResourceConflictException e) { @@ -553,7 +560,11 @@ } local = n; } - local = forProject.filter(local, git, RefFilterOptions.builder().setFilterMeta(true).build()); + local = + forProject + .filter(local.values(), git, RefFilterOptions.builder().setFilterMeta(true).build()) + .stream() + .collect(toMap(Ref::getName, r -> r)); } List<RemoteRefUpdate> remoteUpdatesList = @@ -570,7 +581,7 @@ Map<String, Ref> remote = listRemote(tn); for (Ref src : local.values()) { if (!canPushRef(src.getName(), noPerms)) { - repLog.debug("Skipping push of ref {}", src.getName()); + repLog.atFine().log("Skipping push of ref %s", src.getName()); continue; } @@ -587,7 +598,7 @@ if (config.isMirror()) { for (Ref ref : remote.values()) { if (Constants.HEAD.equals(ref.getName())) { - repLog.debug("Skipping deletion of {}", ref.getName()); + repLog.atFine().log("Skipping deletion of %s", ref.getName()); continue; } RefSpec spec = matchDst(ref.getName());
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java index 7538298..f96c157 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RemoteSsh.java
@@ -42,17 +42,14 @@ OutputStream errStream = sshHelper.newErrorBufferStream(); try { sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Created remote repository: {}", uri); + repLog.atInfo().log("Created remote repository: %s", uri); } catch (IOException e) { - repLog.error( - "Error creating remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error creating remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream); return false; } return true; @@ -65,17 +62,14 @@ OutputStream errStream = sshHelper.newErrorBufferStream(); try { sshHelper.executeRemoteSsh(uri, cmd, errStream); - repLog.info("Deleted remote repository: {}", uri); + repLog.atInfo().log("Deleted remote repository: %s", uri); } catch (IOException e) { - repLog.error( - "Error deleting remote repository at {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error deleting remote repository at %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, e, cmd, errStream); return false; } return true; @@ -90,16 +84,12 @@ try { sshHelper.executeRemoteSsh(uri, cmd, errStream); } catch (IOException e) { - repLog.error( - "Error updating HEAD of remote repository at {} to {}:\n" - + " Exception: {}\n" - + " Command: {}\n" - + " Output: {}", - uri, - newHead, - e, - cmd, - errStream); + repLog.atSevere().log( + "Error updating HEAD of remote repository at %s to %s:\n" + + " Exception: %s\n" + + " Command: %s\n" + + " Output: %s", + uri, newHead, e, cmd, errStream); return false; } return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java index d3d64db..99e5ee4 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -45,6 +45,13 @@ boolean isDefaultForceUpdate(); /** + * Returns the interval in seconds for running task distribution. + * + * @return number of seconds, zero if never. + */ + int getDistributionInterval(); + + /** * Returns the maximum number of ref-specs to log into the replication_log whenever a push * operation is completed against a replication end. *
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java index e99f6b1..2631cbe 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -47,9 +47,9 @@ try { config.load(); } catch (ConfigInvalidException e) { - repLog.error("Config file {} is invalid: {}", cfgPath, e.getMessage(), e); + repLog.atSevere().withCause(e).log("Config file %s is invalid: %s", cfgPath, e.getMessage()); } catch (IOException e) { - repLog.error("Cannot read {}: {}", cfgPath, e.getMessage(), e); + repLog.atSevere().withCause(e).log("Cannot read %s: %s", cfgPath, e.getMessage()); } this.replicateAllOnPluginStart = config.getBoolean("gerrit", "replicateOnStartup", false); this.defaultForceUpdate = config.getBoolean("gerrit", "defaultForceUpdate", false); @@ -86,6 +86,11 @@ } @Override + public int getDistributionInterval() { + return config.getInt("replication", "distributionInterval", 0); + } + + @Override public int getMaxRefsToLog() { return maxRefsToLog; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java index fed09f9..60a9523 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationLogFile.java
@@ -28,6 +28,6 @@ systemLog, serverInfo, ReplicationQueue.REPLICATION_LOG_NAME, - new PatternLayout("[%d] [%X{" + PushOne.ID_MDC_KEY + "}] %m%n")); + new PatternLayout("[%d] %m%n")); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 869f728..a8ffeec 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,6 +14,8 @@ package com.googlesource.gerrit.plugins.replication; +import static java.util.concurrent.TimeUnit.SECONDS; + import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Queues; @@ -25,6 +27,7 @@ import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.EventDispatcher; import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.util.logging.NamedFluentLogger; import com.google.inject.Inject; import com.google.inject.Provider; import com.googlesource.gerrit.plugins.replication.PushResultProcessing.GitUpdateProcessing; @@ -34,9 +37,9 @@ import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.eclipse.jgit.transport.URIish; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Manages automatic replication to remote repositories. */ public class ReplicationQueue @@ -46,10 +49,11 @@ ProjectDeletedListener, HeadUpdatedListener { static final String REPLICATION_LOG_NAME = "replication_log"; - static final Logger repLog = LoggerFactory.getLogger(REPLICATION_LOG_NAME); + static final NamedFluentLogger repLog = NamedFluentLogger.forName(REPLICATION_LOG_NAME); private final ReplicationStateListener stateLog; + private final ReplicationConfig replConfig; private final WorkQueue workQueue; private final DynamicItem<EventDispatcher> dispatcher; private final Provider<ReplicationDestinations> destinations; // For Guice circular dependency @@ -57,14 +61,17 @@ private volatile boolean running; private volatile boolean replaying; private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; + private Distributor distributor; @Inject ReplicationQueue( + ReplicationConfig rc, WorkQueue wq, Provider<ReplicationDestinations> rd, DynamicItem<EventDispatcher> dis, ReplicationStateListeners sl, ReplicationTasksStorage rts) { + replConfig = rc; workQueue = wq; dispatcher = dis; destinations = rd; @@ -81,15 +88,17 @@ replicationTasksStorage.resetAll(); firePendingEvents(); fireBeforeStartupEvents(); + distributor = new Distributor(workQueue); } } @Override public void stop() { running = false; + distributor.stop(); int discarded = destinations.get().shutdown(); if (discarded > 0) { - repLog.warn("Canceled {} replication events during shutdown", discarded); + repLog.atWarning().log("Canceled %d replication events during shutdown", discarded); } } @@ -110,17 +119,17 @@ @VisibleForTesting public void scheduleFullSync( Project.NameKey project, String urlMatch, ReplicationState state, boolean now) { - fire(project, urlMatch, PushOne.ALL_REFS, state, now); + fire(project, urlMatch, PushOne.ALL_REFS, state, now, false); } @Override public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { - fire(event.getProjectName(), event.getRefName()); + fire(event.getProjectName(), event.getRefName(), false); } - private void fire(String projectName, String refName) { + private void fire(String projectName, String refName, boolean isPersisted) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); - fire(Project.nameKey(projectName), null, refName, state, false); + fire(Project.nameKey(projectName), null, refName, state, false, isPersisted); state.markAllPushTasksScheduled(); } @@ -129,7 +138,8 @@ String urlMatch, String refName, ReplicationState state, - boolean now) { + boolean now, + boolean isPersisted) { if (!running) { stateLog.warn( "Replication plugin did not finish startup before event, event replication is postponed", @@ -141,12 +151,14 @@ for (Destination cfg : destinations.get().getAll(FilterType.ALL)) { if (cfg.wouldPushProject(project) && cfg.wouldPushRef(refName)) { for (URIish uri : cfg.getURIs(project, urlMatch)) { - replicationTasksStorage.create( - new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); + if (!isPersisted) { + replicationTasksStorage.create( + new ReplicateRefUpdate(project.get(), refName, uri, cfg.getRemoteConfigName())); + } cfg.schedule(project, refName, uri, state, now); } } else { - repLog.debug("Skipping ref {} on project {}", refName, project.get()); + repLog.atFine().log("Skipping ref %s on project %s", refName, project.get()); } } } @@ -159,8 +171,8 @@ for (ReplicationTasksStorage.ReplicateRefUpdate t : replicationTasksStorage.listWaiting()) { String eventKey = String.format("%s:%s", t.project, t.ref); if (!eventsReplayed.contains(eventKey)) { - repLog.info("Firing pending task {}", eventKey); - fire(t.project, t.ref); + repLog.atInfo().log("Firing pending task %s", eventKey); + fire(t.project, t.ref, true); eventsReplayed.add(eventKey); } } @@ -169,6 +181,30 @@ } } + private void pruneCompleted() { + // Queue tasks have wrappers around them so workQueue.getTasks() does not return the PushOnes. + // We also cannot access them by taskId since PushOnes don't have a taskId, they do have + // and Id, but it not the id assigned to the task in the queues. The tasks in the queue + // do use the same name as returned by toString() though, so that be used to correlate + // PushOnes with queue tasks despite their wrappers. + Set<String> prunableTaskNames = new HashSet<>(); + for (Destination destination : destinations.get().getAll(FilterType.ALL)) { + prunableTaskNames.addAll(destination.getPrunableTaskNames()); + } + + for (WorkQueue.Task<?> task : workQueue.getTasks()) { + WorkQueue.Task.State state = task.getState(); + if (state == WorkQueue.Task.State.SLEEPING || state == WorkQueue.Task.State.READY) { + if (task instanceof WorkQueue.ProjectTask) { + if (prunableTaskNames.contains(task.toString())) { + repLog.atFine().log("Pruning externally completed task: %s", task); + task.cancel(false); + } + } + } + } + } + @Override public void onProjectDeleted(ProjectDeletedListener.Event event) { Project.NameKey p = Project.nameKey(event.getProjectName()); @@ -188,8 +224,8 @@ for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) { String eventKey = String.format("%s:%s", event.projectName(), event.refName()); if (!eventsReplayed.contains(eventKey)) { - repLog.info("Firing pending task {}", event); - fire(event.projectName(), event.refName()); + repLog.atInfo().log("Firing pending task %s", event); + fire(event.projectName(), event.refName(), false); eventsReplayed.add(eventKey); } } @@ -206,4 +242,49 @@ public abstract String refName(); } + + protected class Distributor implements WorkQueue.CancelableRunnable { + public ScheduledThreadPoolExecutor executor; + public ScheduledFuture<?> future; + + public Distributor(WorkQueue wq) { + int distributionInterval = replConfig.getDistributionInterval(); + if (distributionInterval > 0) { + executor = wq.createQueue(1, "Replication Distribution", false); + future = + executor.scheduleWithFixedDelay( + this, distributionInterval, distributionInterval, SECONDS); + } + } + + @Override + public void run() { + if (!running) { + return; + } + try { + firePendingEvents(); + pruneCompleted(); + } catch (Exception e) { + repLog.atSevere().withCause(e).log("error distributing tasks"); + } + } + + @Override + public void cancel() { + future.cancel(true); + } + + public void stop() { + if (executor != null) { + cancel(); + executor.getQueue().remove(this); + } + } + + @Override + public String toString() { + return "Replication Distributor"; + } + } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java index f2d55de..3e73033 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -31,19 +31,19 @@ @Override public void warn(String msg, ReplicationState... states) { stateWriteErr("Warning: " + msg, states); - repLog.warn(msg); + repLog.atWarning().log(msg); } @Override public void error(String msg, ReplicationState... states) { stateWriteErr("Error: " + msg, states); - repLog.error(msg); + repLog.atSevere().log(msg); } @Override public void error(String msg, Throwable t, ReplicationState... states) { stateWriteErr("Error: " + msg, states); - repLog.error(msg, t); + repLog.atSevere().withCause(t).log(msg); } private void stateWriteErr(String msg, ReplicationState[] states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java index d15f02d..5af0983 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -26,6 +26,7 @@ import java.io.IOException; import java.nio.file.DirectoryIteratorException; import java.nio.file.DirectoryStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -48,13 +49,19 @@ * task: * * <p><code> - * .../building/<tmp_name> new replication tasks under construction - * .../running/<sha1> running replication tasks - * .../waiting/<sha1> outstanding replication tasks + * .../building/<tmp_name> new replication tasks under construction + * .../running/<uri_sha1>/ lock for URI + * .../running/<uri_sha1>/<task_sha1> running replication tasks + * .../waiting/<task_sha1_NN_shard>/<task_sha1> outstanding replication tasks * </code> * + * <p>The URI lock is acquired by creating the directory and released by removing it. + * * <p>Tasks are moved atomically via a rename between those directories to indicate the current * state of each task. + * + * <p>Note: The .../waiting/<task_sha1_NN_shard> directories are never removed. This helps prevent + * failures when moving tasks to and from the shard directories from different hosts concurrently. */ @Singleton public class ReplicationTasksStorage { @@ -62,26 +69,50 @@ private boolean disableDeleteForTesting; - public static class ReplicateRefUpdate { - public final String project; + public static class ReplicateRefUpdate extends UriUpdate { public final String ref; - public final String uri; - public final String remote; - public ReplicateRefUpdate(PushOne push, String ref) { - this(push.getProjectNameKey().get(), ref, push.getURI(), push.getRemoteName()); + public ReplicateRefUpdate(UriUpdate update, String ref) { + this(update.project, ref, update.uri, update.remote); } public ReplicateRefUpdate(String project, String ref, URIish uri, String remote) { - this.project = project; + this(project, ref, uri.toASCIIString(), remote); + } + + protected ReplicateRefUpdate(String project, String ref, String uri, String remote) { + super(project, uri, remote); this.ref = ref; - this.uri = uri.toASCIIString(); + } + + @Override + public String toString() { + return "ref-update " + ref + " (" + super.toString() + ")"; + } + } + + public static class UriUpdate { + public final String project; + public final String uri; + public final String remote; + + public UriUpdate(PushOne push) { + this(push.getProjectNameKey().get(), push.getURI(), push.getRemoteName()); + } + + public UriUpdate(String project, URIish uri, String remote) { + this(project, uri.toASCIIString(), remote); + } + + public UriUpdate(String project, String uri, String remote) { + this.project = project; + this.uri = uri; this.remote = remote; } @Override public String toString() { - return "ref-update " + project + ":" + ref + " uri:" + uri + " remote:" + remote; + return "uri-update " + project + " uri:" + uri + " remote:" + remote; } } @@ -109,28 +140,65 @@ this.disableDeleteForTesting = deleteDisabled; } - public synchronized void start(PushOne push) { - for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).start(); + public synchronized boolean start(PushOne push) { + UriLock lock = new UriLock(push); + if (!lock.acquire()) { + return false; } + + boolean started = false; + for (String ref : push.getRefs()) { + started = new Task(lock, ref).start() || started; + } + + if (!started) { // No tasks left, likely replicated externally + lock.release(); + } + return started; } public synchronized void reset(PushOne push) { + UriLock lock = new UriLock(push); for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).reset(); + new Task(lock, ref).reset(); } + lock.release(); } public synchronized void resetAll() { - for (ReplicateRefUpdate r : listRunning()) { - new Task(r).reset(); + try (DirectoryStream<Path> dirs = Files.newDirectoryStream(createDir(runningUpdates))) { + for (Path dir : dirs) { + UriLock lock = null; + try { + for (ReplicateRefUpdate u : list(dir)) { + if (lock == null) { + lock = new UriLock(u); + } + new Task(u).reset(); + } + } catch (DirectoryIteratorException d) { + // iterating over the sub-directories is expected to have dirs disappear + Nfs.throwIfNotStaleFileHandle(d.getCause()); + } + if (lock != null) { + lock.release(); + } + } + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while aborting running tasks"); } } - public synchronized void finish(PushOne push) { - for (String ref : push.getRefs()) { - new Task(new ReplicateRefUpdate(push, ref)).finish(); + public boolean isWaiting(PushOne push) { + return push.getRefs().stream().map(ref -> new Task(push, ref)).anyMatch(Task::isWaiting); + } + + public void finish(PushOne push) { + UriLock lock = new UriLock(push); + for (ReplicateRefUpdate r : list(lock.runningDir)) { + new Task(lock, r.ref).finish(); } + lock.release(); } public synchronized List<ReplicateRefUpdate> listWaiting() { @@ -195,20 +263,73 @@ } } + private class UriLock { + public final UriUpdate update; + public final String uriKey; + public final Path runningDir; + + public UriLock(PushOne push) { + this(new UriUpdate(push)); + } + + public UriLock(UriUpdate update) { + this.update = update; + uriKey = sha1(update.uri).name(); + runningDir = createDir(runningUpdates).resolve(uriKey); + } + + public boolean acquire() { + try { + logger.atFine().log("MKDIR %s %s", runningDir, updateLog()); + Files.createDirectory(runningDir); + return true; + } catch (FileAlreadyExistsException e) { + return false; // already running, likely externally + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while starting uri %s", uriKey); + return true; // safer to risk a duplicate than to skip it + } + } + + public void release() { + try { + if (disableDeleteForTesting && Files.list(runningDir).findFirst().isPresent()) { + logger.atFine().log("DELETE %s %s DISABLED", runningDir, updateLog()); + return; + } + + logger.atFine().log("DELETE %s %s", runningDir, updateLog()); + Files.delete(runningDir); + } catch (IOException e) { + logger.atSevere().withCause(e).log("Error while releasing uri %s", uriKey); + } + } + + private String updateLog() { + return String.format("(%s => %s)", update.project, update.uri); + } + } + private class Task { public final ReplicateRefUpdate update; - public final String json; public final String taskKey; public final Path running; public final Path waiting; - public Task(ReplicateRefUpdate update) { - this.update = update; - json = GSON.toJson(update) + "\n"; + public Task(ReplicateRefUpdate r) { + this(new UriLock(r), r.ref); + } + + public Task(PushOne push, String ref) { + this(new UriLock(push), ref); + } + + public Task(UriLock lock, String ref) { + update = new ReplicateRefUpdate(lock.update, ref); String key = update.project + "\n" + update.ref + "\n" + update.uri + "\n" + update.remote; taskKey = sha1(key).name(); - running = createDir(runningUpdates).resolve(taskKey); - waiting = createDir(waitingUpdates).resolve(taskKey); + running = lock.runningDir.resolve(taskKey); + waiting = createDir(waitingUpdates.resolve(taskKey.substring(0, 2))).resolve(taskKey); } public String create() { @@ -216,6 +337,7 @@ return taskKey; } + String json = GSON.toJson(update) + "\n"; try { Path tmp = Files.createTempFile(createDir(buildingUpdates), taskKey, null); logger.atFine().log("CREATE %s %s", tmp, updateLog()); @@ -228,14 +350,19 @@ return taskKey; } - public void start() { + public boolean start() { rename(waiting, running); + return Files.exists(running); } public void reset() { rename(running, waiting); } + public boolean isWaiting() { + return Files.exists(waiting); + } + public void finish() { if (disableDeleteForTesting) { logger.atFine().log("DELETE %s %s DISABLED", running, updateLog()); @@ -246,7 +373,7 @@ logger.atFine().log("DELETE %s %s", running, updateLog()); Files.delete(running); } catch (IOException e) { - logger.atSevere().withCause(e).log("Error while deleting task %s", taskKey); + logger.atSevere().withCause(e).log("Error while finishing task %s", taskKey); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java index bcb1e2f..f7071d8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RunwayStatus.java
@@ -27,6 +27,10 @@ return new RunwayStatus(false, inFlightPushId); } + public static RunwayStatus deniedExternal() { + return new RunwayStatus(false, -1); + } + private final boolean allowed; private final int inFlightPushId; @@ -43,6 +47,10 @@ return !allowed && inFlightPushId == 0; } + public boolean isExternalInflight() { + return !allowed && inFlightPushId == -1; + } + public int getInFlightPushId() { return inFlightPushId; }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java index ffa6be1..fdbd5e7 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/UpdateHeadTask.java
@@ -58,7 +58,8 @@ return; } - repLog.warn("Cannot update HEAD of project {} on remote site {}.", project, replicateURI); + repLog.atWarning().log( + "Cannot update HEAD of project %s on remote site %s.", project, replicateURI); } @Override
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 0aad73b..fc3352d 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -94,6 +94,20 @@ : Timeout for SSH connections. If 0, there is no timeout and the client waits indefinitely. By default, 2 minutes. +replication.distributionInterval +: Interval in seconds for running the replication distributor. When + run, the replication distributor will add all persisted waiting tasks + to the queue to ensure that externally loaded tasks are visible to + the current process. If zero, turn off the replication distributor. By + default, zero. + + Turning this on is likely only useful when there are other processes + (such as other masters in the same cluster) writing to the same + persistence store. To ensure that updates are seen well before their + replicationDelay expires when the distributor is used, the recommended + value for this is approximately the smallest remote.NAME.replicationDelay + divided by 5. + replication.lockErrorMaxRetries : Number of times to retry a replication operation if a lock error is detected.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java index c09fcd1..94f0dc4 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/PushOneTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -245,9 +246,9 @@ return push; } - private void setupProjectCacheMock() throws IOException { + private void setupProjectCacheMock() { projectCacheMock = mock(ProjectCache.class); - when(projectCacheMock.checkedGet(projectNameKey)).thenReturn(projectStateMock); + when(projectCacheMock.get(projectNameKey)).thenReturn(Optional.of(projectStateMock)); } private void setupTransportMock() throws NotSupportedException, TransportException {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java index efacae7..79b05cc 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfigTest.java
@@ -65,4 +65,25 @@ assertThatContainsDestination(destinations, remoteName1, remoteUrl1); assertThatContainsDestination(destinations, remoteName2, remoteUrl2); } + + @Test + public void shouldSkipFetchRefSpecs() throws Exception { + FileBasedConfig config = newReplicationConfig(); + String pushRemote = "pushRemote"; + final String aRemoteURL = "ssh://somewhere/${name}.git"; + config.setString("remote", pushRemote, "url", aRemoteURL); + + String fetchRemote = "fetchRemote"; + config.setString("remote", fetchRemote, "url", aRemoteURL); + config.setString("remote", fetchRemote, "fetch", "refs/*:refs/*"); + config.save(); + + DestinationsCollection destinationsCollections = + newDestinationsCollections(newReplicationFileBasedConfig()); + destinationsCollections.startup(workQueueMock); + List<Destination> destinations = destinationsCollections.getAll(FilterType.ALL); + assertThat(destinations).hasSize(1); + + assertThatIsDestination(destinations.get(0), pushRemote, aRemoteURL); + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java index 3eb6baf..61d0b9b 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -73,6 +73,7 @@ private Path gitPath; private Path storagePath; private FileBasedConfig config; + private ReplicationConfig replicationConfig; private ReplicationTasksStorage tasksStorage; @Override @@ -90,6 +91,7 @@ super.setUpTestPlugin(); pluginDataDir = plugin.getSysInjector().getInstance(Key.get(Path.class, PluginData.class)); + replicationConfig = plugin.getSysInjector().getInstance(ReplicationConfig.class); storagePath = pluginDataDir.resolve("ref-updates"); tasksStorage = plugin.getSysInjector().getInstance(ReplicationTasksStorage.class); cleanupReplicationTasks(); @@ -400,6 +402,20 @@ waitUntil(() -> tasksStorage.listRunning().size() == 0); } + @Test + public void shouldCleanupBothTasksAndLocksAfterNewProjectReplication() throws Exception { + tasksStorage.disableDeleteForTesting(false); + setReplicationDestination("task_cleanup_locks_project", "replica", ALL_PROJECTS); + config.setInt("remote", "task_cleanup_locks_project", "replicationRetry", 0); + config.save(); + reloadConfig(); + assertThat(tasksStorage.listRunning()).hasSize(0); + Project.NameKey sourceProject = createTestProject("task_cleanup_locks_project"); + + waitUntil(() -> nonEmptyProjectExists(Project.nameKey(sourceProject + "replica.git"))); + waitUntil(() -> isTaskCleanedUp()); + } + private Ref getRef(Repository repo, String branchName) throws IOException { return repo.getRefDatabase().exactRef(branchName); } @@ -499,6 +515,16 @@ } } + private boolean isTaskCleanedUp() { + Path refUpdates = replicationConfig.getEventsDirectory().resolve("ref-updates"); + Path runningUpdates = refUpdates.resolve("running"); + try { + return Files.list(runningUpdates).count() == 0; + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + private boolean nonEmptyProjectExists(Project.NameKey name) { try (Repository r = repoManager.openRepository(name)) { return !r.getAllRefsByPeeledObjectId().isEmpty();