Merge branch 'stable-2.15' into stable-2.16 * stable-2.15: Fix issue with dropping events on start Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java index 91f7ae1..d73ab7b 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java +++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -18,7 +18,9 @@ import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Strings; +import com.google.common.collect.Queues; import com.google.gerrit.common.Nullable; import com.google.gerrit.extensions.events.GitReferenceUpdatedListener; import com.google.gerrit.extensions.events.HeadUpdatedListener; @@ -36,6 +38,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Optional; +import java.util.Queue; import java.util.Set; import org.eclipse.jgit.transport.URIish; import org.slf4j.Logger; @@ -71,6 +74,7 @@ private final ReplicationTasksStorage replicationTasksStorage; private volatile boolean running; private volatile boolean replaying; + private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue; @Inject ReplicationQueue( @@ -86,6 +90,7 @@ stateLog = sl; adminApiFactory = aaf; replicationTasksStorage = rts; + beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue(); } @Override @@ -94,6 +99,7 @@ config.startup(workQueue); running = true; firePendingEvents(); + fireBeforeStartupEvents(); } } @@ -146,7 +152,10 @@ private void onGitReferenceUpdated(String projectName, String refName) { ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get())); if (!running) { - stateLog.warn("Replication plugin did not finish startup before event", state); + stateLog.warn( + "Replication plugin did not finish startup before event, event replication is postponed", + state); + beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName)); return; } @@ -196,6 +205,18 @@ } } + private void fireBeforeStartupEvents() { + Set<String> eventsReplayed = new HashSet<>(); + for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) { + String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName()); + if (!eventsReplayed.contains(eventKey)) { + repLog.info("Firing pending task {}", event); + onGitReferenceUpdated(event.getProjectName(), event.getRefName()); + eventsReplayed.add(eventKey); + } + } + } + private Set<URIish> getURIs( @Nullable String remoteName, Project.NameKey projectName, FilterType filterType) { if (config.getDestinations(filterType).isEmpty()) { @@ -299,4 +320,34 @@ private void warnCannotPerform(String op, URIish uri) { repLog.warn("Cannot {} on remote site {}.", op, uri); } + + private static class ReferenceUpdatedEvent { + private String projectName; + private String refName; + + public ReferenceUpdatedEvent(String projectName, String refName) { + this.projectName = projectName; + this.refName = refName; + } + + public String getProjectName() { + return projectName; + } + + public String getRefName() { + return refName; + } + + @Override + public int hashCode() { + return Objects.hashCode(projectName, refName); + } + + @Override + public boolean equals(Object obj) { + return (obj instanceof ReferenceUpdatedEvent) + && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName) + && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName); + } + } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java new file mode 100644 index 0000000..2a395ca --- /dev/null +++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -0,0 +1,134 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.googlesource.gerrit.plugins.replication; + +import static com.google.common.truth.Truth.assertThat; +import static java.util.stream.Collectors.toList; + +import com.google.common.base.Stopwatch; +import com.google.gerrit.acceptance.LightweightPluginDaemonTest; +import com.google.gerrit.acceptance.PushOneCommit.Result; +import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.UseLocalDisk; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.config.SitePaths; +import com.google.inject.Inject; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.storage.file.FileBasedConfig; +import org.eclipse.jgit.util.FS; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UseLocalDisk +@TestPlugin( + name = "replication", + sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule") +public class ReplicationQueueIT extends LightweightPluginDaemonTest { + private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueIT.class); + + private static final int TEST_REPLICATION_DELAY = 1; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2); + + @Inject private SitePaths sitePaths; + private Path gitPath; + private FileBasedConfig config; + + @Override + public void setUpTestPlugin() throws Exception { + gitPath = sitePaths.site_path.resolve("git"); + config = + new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED); + + setReplicationDestination("foo", "replica"); + super.setUpTestPlugin(); + } + + @Test + public void shouldNotDropEventsWhenStarting() throws Exception { + Project.NameKey targetProject = createProject("projectreplica"); + + replicationQueueStop(); + Result pushResult = createChange(); + replicationQueueStart(); + + RevCommit sourceCommit = pushResult.getCommit(); + String sourceRef = pushResult.getPatchSet().getRefName(); + + try (Repository repo = repoManager.openRepository(targetProject)) { + waitUntil(() -> checkedGetRef(repo, sourceRef) != null); + Ref targetBranchRef = getRef(repo, sourceRef); + assertThat(targetBranchRef).isNotNull(); + assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId()); + } + } + + private Ref getRef(Repository repo, String branchName) throws IOException { + return repo.getRefDatabase().exactRef(branchName); + } + + private Ref checkedGetRef(Repository repo, String branchName) { + try { + return repo.getRefDatabase().exactRef(branchName); + } catch (Exception e) { + logger.error("failed to get ref %s in repo %s", branchName, repo); + return null; + } + } + + private void setReplicationDestination(String remoteName, String replicaSuffix) + throws IOException { + setReplicationDestination(remoteName, Arrays.asList(replicaSuffix)); + } + + private void setReplicationDestination(String remoteName, List<String> replicaSuffixes) + throws IOException { + + List<String> replicaUrls = + replicaSuffixes.stream() + .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString()) + .collect(toList()); + config.setStringList("remote", remoteName, "url", replicaUrls); + config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY); + config.save(); + } + + private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException { + Stopwatch stopwatch = Stopwatch.createStarted(); + while (!waitCondition.get()) { + if (stopwatch.elapsed().compareTo(TEST_TIMEOUT) > 0) { + throw new InterruptedException(); + } + TimeUnit.MILLISECONDS.sleep(50); + } + } + + private void replicationQueueStart() { + plugin.getSysInjector().getInstance(ReplicationQueue.class).start(); + } + + private void replicationQueueStop() { + plugin.getSysInjector().getInstance(ReplicationQueue.class).stop(); + } +}