Merge branch 'stable-2.15' into stable-2.16
* stable-2.15:
Fix issue with dropping events on start
Change-Id: I8d1b52bcf8f8c288892492b4375178756c784ec3
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 91f7ae1..d73ab7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -18,7 +18,9 @@
import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Strings;
+import com.google.common.collect.Queues;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
@@ -36,6 +38,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
@@ -71,6 +74,7 @@
private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
private volatile boolean replaying;
+ private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
@Inject
ReplicationQueue(
@@ -86,6 +90,7 @@
stateLog = sl;
adminApiFactory = aaf;
replicationTasksStorage = rts;
+ beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@Override
@@ -94,6 +99,7 @@
config.startup(workQueue);
running = true;
firePendingEvents();
+ fireBeforeStartupEvents();
}
}
@@ -146,7 +152,10 @@
private void onGitReferenceUpdated(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
+ stateLog.warn(
+ "Replication plugin did not finish startup before event, event replication is postponed",
+ state);
+ beforeStartupEventsQueue.add(new ReferenceUpdatedEvent(projectName, refName));
return;
}
@@ -196,6 +205,18 @@
}
}
+ private void fireBeforeStartupEvents() {
+ Set<String> eventsReplayed = new HashSet<>();
+ for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
+ String eventKey = String.format("%s:%s", event.getProjectName(), event.getRefName());
+ if (!eventsReplayed.contains(eventKey)) {
+ repLog.info("Firing pending task {}", event);
+ onGitReferenceUpdated(event.getProjectName(), event.getRefName());
+ eventsReplayed.add(eventKey);
+ }
+ }
+ }
+
private Set<URIish> getURIs(
@Nullable String remoteName, Project.NameKey projectName, FilterType filterType) {
if (config.getDestinations(filterType).isEmpty()) {
@@ -299,4 +320,34 @@
private void warnCannotPerform(String op, URIish uri) {
repLog.warn("Cannot {} on remote site {}.", op, uri);
}
+
+ private static class ReferenceUpdatedEvent {
+ private String projectName;
+ private String refName;
+
+ public ReferenceUpdatedEvent(String projectName, String refName) {
+ this.projectName = projectName;
+ this.refName = refName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+
+ public String getRefName() {
+ return refName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(projectName, refName);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (obj instanceof ReferenceUpdatedEvent)
+ && Objects.equal(projectName, ((ReferenceUpdatedEvent) obj).projectName)
+ && Objects.equal(refName, ((ReferenceUpdatedEvent) obj).refName);
+ }
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
new file mode 100644
index 0000000..2a395ca
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationQueueIT.java
@@ -0,0 +1,134 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.base.Stopwatch;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@UseLocalDisk
+@TestPlugin(
+ name = "replication",
+ sysModule = "com.googlesource.gerrit.plugins.replication.ReplicationModule")
+public class ReplicationQueueIT extends LightweightPluginDaemonTest {
+ private static final Logger logger = LoggerFactory.getLogger(ReplicationQueueIT.class);
+
+ private static final int TEST_REPLICATION_DELAY = 1;
+ private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+ @Inject private SitePaths sitePaths;
+ private Path gitPath;
+ private FileBasedConfig config;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ gitPath = sitePaths.site_path.resolve("git");
+ config =
+ new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+
+ setReplicationDestination("foo", "replica");
+ super.setUpTestPlugin();
+ }
+
+ @Test
+ public void shouldNotDropEventsWhenStarting() throws Exception {
+ Project.NameKey targetProject = createProject("projectreplica");
+
+ replicationQueueStop();
+ Result pushResult = createChange();
+ replicationQueueStart();
+
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
+ private Ref getRef(Repository repo, String branchName) throws IOException {
+ return repo.getRefDatabase().exactRef(branchName);
+ }
+
+ private Ref checkedGetRef(Repository repo, String branchName) {
+ try {
+ return repo.getRefDatabase().exactRef(branchName);
+ } catch (Exception e) {
+ logger.error("failed to get ref %s in repo %s", branchName, repo);
+ return null;
+ }
+ }
+
+ private void setReplicationDestination(String remoteName, String replicaSuffix)
+ throws IOException {
+ setReplicationDestination(remoteName, Arrays.asList(replicaSuffix));
+ }
+
+ private void setReplicationDestination(String remoteName, List<String> replicaSuffixes)
+ throws IOException {
+
+ List<String> replicaUrls =
+ replicaSuffixes.stream()
+ .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+ .collect(toList());
+ config.setStringList("remote", remoteName, "url", replicaUrls);
+ config.setInt("remote", remoteName, "replicationDelay", TEST_REPLICATION_DELAY);
+ config.save();
+ }
+
+ private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (!waitCondition.get()) {
+ if (stopwatch.elapsed().compareTo(TEST_TIMEOUT) > 0) {
+ throw new InterruptedException();
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+ }
+
+ private void replicationQueueStart() {
+ plugin.getSysInjector().getInstance(ReplicationQueue.class).start();
+ }
+
+ private void replicationQueueStop() {
+ plugin.getSysInjector().getInstance(ReplicationQueue.class).stop();
+ }
+}