Merge branch 'stable-2.16' into stable-3.0
* stable-2.16:
ReplicationIT: speedup drain replication queue tests
ReplicationIT: Remove unused variables in tests
Reformat with GJF
Option to drain the event queue before shutdown
Change-Id: I02a30f057aba348f2977c12b669c17a560d710b1
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index 2a329e2..679776f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -723,6 +723,14 @@
return config.getMaxRetries();
}
+ public int getDrainQueueAttempts() {
+ return config.getDrainQueueAttempts();
+ }
+
+ public int getReplicationDelaySeconds() {
+ return config.getDelay() * 1000;
+ }
+
private static boolean matches(URIish uri, String urlMatch) {
if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
return true;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index b2d0de2..f688cfc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -22,10 +22,12 @@
public class DestinationConfiguration {
static final int DEFAULT_REPLICATION_DELAY = 15;
static final int DEFAULT_RESCHEDULE_DELAY = 3;
+ static final int DEFAULT_DRAIN_QUEUE_ATTEMPTS = 0;
private final int delay;
private final int rescheduleDelay;
private final int retryDelay;
+ private final int drainQueueAttempts;
private final int lockErrorMaxRetries;
private final ImmutableList<String> adminUrls;
private final int poolThreads;
@@ -50,6 +52,8 @@
projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+ drainQueueAttempts =
+ Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
@@ -77,6 +81,10 @@
return retryDelay;
}
+ public int getDrainQueueAttempts() {
+ return drainQueueAttempts;
+ }
+
public int getPoolThreads() {
return poolThreads;
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index a075f12..ea6d361 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -311,11 +311,52 @@
public int shutdown() {
int discarded = 0;
for (Destination cfg : destinations) {
- discarded += cfg.shutdown();
+ try {
+ drainReplicationEvents(cfg);
+ } catch (EventQueueNotEmptyException e) {
+ logger.atWarning().log("Event queue not empty: %s", e.getMessage());
+ } finally {
+ discarded += cfg.shutdown();
+ }
}
return discarded;
}
+ void drainReplicationEvents(Destination destination) throws EventQueueNotEmptyException {
+ int drainQueueAttempts = destination.getDrainQueueAttempts();
+ if (drainQueueAttempts == 0) {
+ return;
+ }
+ int pending = destination.getQueueInfo().pending.size();
+ int inFlight = destination.getQueueInfo().inFlight.size();
+
+ while ((inFlight > 0 || pending > 0) && drainQueueAttempts > 0) {
+ try {
+ logger.atInfo().log(
+ "Draining replication events, postpone shutdown. Events left: inFlight %d, pending %d",
+ inFlight, pending);
+ Thread.sleep(destination.getReplicationDelaySeconds());
+ } catch (InterruptedException ie) {
+ logger.atWarning().withCause(ie).log(
+ "Wait for replication events to drain has been interrupted");
+ }
+ pending = destination.getQueueInfo().pending.size();
+ inFlight = destination.getQueueInfo().inFlight.size();
+ drainQueueAttempts--;
+ }
+
+ if (pending > 0 || inFlight > 0) {
+ throw new EventQueueNotEmptyException(
+ String.format("Pending: %d - InFlight: %d", pending, inFlight));
+ }
+ }
+
+ public static class EventQueueNotEmptyException extends Exception {
+ public EventQueueNotEmptyException(String errorMessage) {
+ super(errorMessage);
+ }
+ }
+
FileBasedConfig getConfig() {
return config;
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index f58845d..32bc630 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -297,6 +297,20 @@
By default, use replication.maxRetries.
+remote.NAME.drainQueueAttempts
+: Maximum number of attempts to drain the replication event queue before
+ stopping the plugin.
+
+ When stopping the plugin, the shutdown will be delayed trying to drain
+ the event queue.
+
+ The maximum delay is "drainQueueAttempts" * "replicationDelay" seconds.
+
+ When not set or set to 0, the queue is not drained and the pending
+ replication events are cancelled.
+
+ By default, do not drain replication events.
+
remote.NAME.threads
: Number of worker threads to dedicate to pushing to the
repositories described by this remote. Each thread can push
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
index 31aab02..03d9295 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ReplicationIT.java
@@ -245,6 +245,52 @@
}
}
+ @Test
+ public void shouldNotDrainTheQueueWhenReloading() throws Exception {
+ // Setup repo to replicate
+ Project.NameKey targetProject =
+ projectOperations.newProject().name(project + "replica").create();
+ String remoteName = "doNotDrainQueue";
+ setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+ Result pushResult = createChange();
+ shutdownConfig();
+
+ pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ exception.expect(InterruptedException.class);
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ }
+ }
+
+ @Test
+ public void shouldDrainTheQueueWhenReloading() throws Exception {
+ // Setup repo to replicate
+ Project.NameKey targetProject =
+ projectOperations.newProject().name(project + "replica").create();
+ String remoteName = "drainQueue";
+ setReplicationDestination(remoteName, "replica", ALL_PROJECTS);
+
+ config.setInt("remote", remoteName, "drainQueueAttempts", 2);
+ config.save();
+ reloadConfig();
+
+ Result pushResult = createChange();
+ shutdownConfig();
+
+ RevCommit sourceCommit = pushResult.getCommit();
+ String sourceRef = pushResult.getPatchSet().getRefName();
+
+ try (Repository repo = repoManager.openRepository(targetProject)) {
+ waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+ Ref targetBranchRef = getRef(repo, sourceRef);
+ assertThat(targetBranchRef).isNotNull();
+ assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+ }
+ }
+
private Ref getRef(Repository repo, String branchName) throws IOException {
return repo.getRefDatabase().exactRef(branchName);
}
@@ -285,6 +331,10 @@
plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).forceReload();
}
+ private void shutdownConfig() {
+ plugin.getSysInjector().getInstance(AutoReloadConfigDecorator.class).shutdown();
+ }
+
private List<ReplicateRefUpdate> listReplicationTasks(String refRegex) {
Pattern refmaskPattern = Pattern.compile(refRegex);
return tasksStorage.list().stream()