Merge branch 'stable-3.12'
* stable-3.12:
Introduce index.initialDelay for delaying the indexing on peer nodes
Change-Id: I004b24ee49430a4ce491f53b63cc190972e03ab9
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index 8d7c86a..f75bdaa 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -61,6 +61,7 @@
// common parameters to cache and index sections
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
+ static final String INITIAL_DELAY = "initialDelay";
static final String BATCH_THREAD_POOL_SIZE_KEY = "batchThreadPoolSize";
static final int DEFAULT_THREAD_POOL_SIZE = 4;
@@ -625,6 +626,7 @@
public static class Index extends Forwarding {
static final int DEFAULT_MAX_TRIES = 2;
static final Duration DEFAULT_RETRY_INTERVAL = Duration.ofSeconds(30);
+ static final Duration DEFAULT_INITIAL_DELAY = Duration.ofMillis(0);
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
@@ -633,6 +635,7 @@
static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
+ private final long initialDelayMsec;
private final int batchThreadPoolSize;
private final Duration retryInterval;
private final int maxTries;
@@ -641,6 +644,8 @@
private Index(Config cfg) {
super(cfg, INDEX_SECTION);
threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+ initialDelayMsec =
+ getDuration(cfg, INDEX_SECTION, INITIAL_DELAY, DEFAULT_INITIAL_DELAY).toMillis();
batchThreadPoolSize = getInt(cfg, INDEX_SECTION, BATCH_THREAD_POOL_SIZE_KEY, threadPoolSize);
retryInterval = getDuration(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL);
maxTries = getMaxTries(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES);
@@ -652,6 +657,10 @@
return threadPoolSize;
}
+ public long initialDelayMsec() {
+ return initialDelayMsec;
+ }
+
public int batchThreadPoolSize() {
return batchThreadPoolSize;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
index 3d3212b..d48df46 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
@@ -17,14 +17,25 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Provider;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public abstract class ExecutorProvider
implements Provider<ScheduledExecutorService>, LifecycleListener {
- private ScheduledExecutorService executor;
+ private ScheduledWithDelayExecutorService executor;
- protected ExecutorProvider(WorkQueue workQueue, int threadPoolSize, String threadNamePrefix) {
- executor = workQueue.createQueue(threadPoolSize, threadNamePrefix);
+ protected ExecutorProvider(
+ WorkQueue workQueue, int threadPoolSize, String threadNamePrefix, long scheduleDelayMsec) {
+ executor =
+ new ScheduledWithDelayExecutorService(
+ workQueue.createQueue(threadPoolSize, threadNamePrefix), scheduleDelayMsec);
}
@Override
@@ -42,4 +53,109 @@
public ScheduledExecutorService get() {
return executor;
}
+
+ private static class ScheduledWithDelayExecutorService implements ScheduledExecutorService {
+ private final ScheduledExecutorService executor;
+ private final long scheduleDelayMsec;
+
+ ScheduledWithDelayExecutorService(
+ ScheduledExecutorService executorService, long scheduleDelayMsec) {
+ this.executor = executorService;
+ this.scheduleDelayMsec = scheduleDelayMsec;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return executor.schedule(
+ callable, unit.toMillis(delay) + scheduleDelayMsec, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ return executor.schedule(
+ command, unit.toMillis(delay) + scheduleDelayMsec, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return executor.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return executor.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return executor.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return executor.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return executor.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return executor.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return executor.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return executor.isTerminated();
+ }
+
+ @Override
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return executor.shutdownNow();
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return executor.submit(task, result);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ executor.execute(command);
+ }
+ }
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
index a3a7e64..f82a42d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
@@ -31,6 +31,9 @@
@Inject
ForwardedBatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
super(
- workQueue, config.index().batchThreadPoolSize(), FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX);
+ workQueue,
+ config.index().batchThreadPoolSize(),
+ FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX,
+ config.index().initialDelayMsec());
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
index da623df..d7dd40f 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
@@ -29,6 +29,10 @@
@Inject
ForwardedIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.index().threadPoolSize(), FORWARDED_INDEX_EVENT_THREAD_PREFIX);
+ super(
+ workQueue,
+ config.index().threadPoolSize(),
+ FORWARDED_INDEX_EVENT_THREAD_PREFIX,
+ config.index().initialDelayMsec());
}
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 5027838..b089091 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -331,6 +331,15 @@
stale and needs to be investigated and manually reindexed.
Defaults to 2.
+```index.initialDelay```
+: The initial delay, internally converted in milliseconds, of triggering the
+ indexing operation after the indexing even has been received.
+ Typically needed when there is a well-known latency of propagation of the updates
+ across the nodes sharing the same NFS volume.
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ Defaults to 0 milliseconds, meaning that indexing happens immediately when the indexing
+ event is received.
+
```index.retryInterval```
: The interval of time in milliseconds between the subsequent auto-retries.
Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).