Merge branch 'stable-3.0'
* stable-3.0:
Clean up ReplicationQueue tests
Convert ReferenceUpdateEvent class to AutoValue
Fix issue with dropping events on start
Change-Id: I5a4bc84a829600865fdd551152219cd1bbe5aa91
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 5bc7320..3e17167 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -14,7 +14,9 @@
package com.googlesource.gerrit.plugins.replication;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Queues;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.events.HeadUpdatedListener;
import com.google.gerrit.extensions.events.LifecycleListener;
@@ -30,6 +32,7 @@
import com.googlesource.gerrit.plugins.replication.ReplicationTasksStorage.ReplicateRefUpdate;
import java.util.HashSet;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import org.eclipse.jgit.transport.URIish;
import org.slf4j.Logger;
@@ -52,6 +55,7 @@
private final ReplicationTasksStorage replicationTasksStorage;
private volatile boolean running;
private volatile boolean replaying;
+ private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
@Inject
ReplicationQueue(
@@ -65,6 +69,7 @@
destinations = rd;
stateLog = sl;
replicationTasksStorage = rts;
+ beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
}
@Override
@@ -73,6 +78,7 @@
destinations.get().startup(workQueue);
running = true;
firePendingEvents();
+ fireBeforeStartupEvents();
}
}
@@ -125,7 +131,10 @@
private void onGitReferenceUpdated(String projectName, String refName) {
ReplicationState state = new ReplicationState(new GitUpdateProcessing(dispatcher.get()));
if (!running) {
- stateLog.warn("Replication plugin did not finish startup before event", state);
+ stateLog.warn(
+ "Replication plugin did not finish startup before event, event replication is postponed",
+ state);
+ beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(projectName, refName));
return;
}
@@ -173,4 +182,28 @@
destinations.get().getURIs(Optional.empty(), p, FilterType.ALL).entries().stream()
.forEach(e -> e.getKey().scheduleUpdateHead(e.getValue(), p, event.getNewHeadName()));
}
+
+ private void fireBeforeStartupEvents() {
+ Set<String> eventsReplayed = new HashSet<>();
+ for (ReferenceUpdatedEvent event : beforeStartupEventsQueue) {
+ String eventKey = String.format("%s:%s", event.projectName(), event.refName());
+ if (!eventsReplayed.contains(eventKey)) {
+ repLog.info("Firing pending task {}", event);
+ onGitReferenceUpdated(event.projectName(), event.refName());
+ eventsReplayed.add(eventKey);
+ }
+ }
+ }
+
+ @AutoValue
+ abstract static class ReferenceUpdatedEvent {
+
+ static ReferenceUpdatedEvent create(String projectName, String refName) {
+ return new AutoValue_ReplicationQueue_ReferenceUpdatedEvent(projectName, refName);
+ }
+
+ public abstract String projectName();
+
+ public abstract String refName();
+ }
}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index d39cf79..eed5158 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -306,8 +306,26 @@
}
}
- private Project.NameKey createTestProject(String name) throws Exception {
- return projectOperations.newProject().name(name).create();
+ @Test
+ public void shouldNotDropEventsWhenStarting() throws Exception {
+ Project.NameKey targetProject = createTestProject(project + "replica");
+
+ setReplicationDestination("foo", "replica", ALL_PROJECTS);
+ reloadConfig();
+
+ replicationQueueStop();
+ Result pushResult = createChange();
+ replicationQueueStart();
+
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().refName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
}
private Ref getRef(Repository repo, String branchName) throws IOException {
@@ -348,11 +366,35 @@
}
private void reloadConfig() {
- plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).reload();
+ getAutoReloadConfigDecoratorInstance().reload();
}
private void shutdownDestinations() {
- plugin.getSysInjector().getInstance(DestinationsCollection.class).shutdown();
+ getInstance(DestinationsCollection.class).shutdown();
+ }
+
+ private void replicationQueueStart() {
+ getReplicationQueueInstance().start();
+ }
+
+ private void replicationQueueStop() {
+ getReplicationQueueInstance().stop();
+ }
+
+ private AutoReloadConfigDecorator getAutoReloadConfigDecoratorInstance() {
+ return getInstance(AutoReloadConfigDecorator.class);
+ }
+
+ private ReplicationQueue getReplicationQueueInstance() {
+ return getInstance(ReplicationQueue.class);
+ }
+
+ private <T> T getInstance(Class<T> classObj) {
+ return plugin.getSysInjector().getInstance(classObj);
+ }
+
+ private Project.NameKey createTestProject(String name) throws Exception {
+ return projectOperations.newProject().name(name).create();
}
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {