Merge branch 'stable-3.11' into stable-3.12 * stable-3.11: Introduce index.initialDelay for delaying the indexing on peer nodes Change-Id: Ic5f4b31c1e1651bc6b810a5b1738b4f90d8e6d70
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java index 8d7c86a..f75bdaa 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -61,6 +61,7 @@ // common parameters to cache and index sections static final String THREAD_POOL_SIZE_KEY = "threadPoolSize"; + static final String INITIAL_DELAY = "initialDelay"; static final String BATCH_THREAD_POOL_SIZE_KEY = "batchThreadPoolSize"; static final int DEFAULT_THREAD_POOL_SIZE = 4; @@ -625,6 +626,7 @@ public static class Index extends Forwarding { static final int DEFAULT_MAX_TRIES = 2; static final Duration DEFAULT_RETRY_INTERVAL = Duration.ofSeconds(30); + static final Duration DEFAULT_INITIAL_DELAY = Duration.ofMillis(0); static final String INDEX_SECTION = "index"; static final String MAX_TRIES_KEY = "maxTries"; @@ -633,6 +635,7 @@ static final boolean DEFAULT_SYNCHRONIZE_FORCED = true; private final int threadPoolSize; + private final long initialDelayMsec; private final int batchThreadPoolSize; private final Duration retryInterval; private final int maxTries; @@ -641,6 +644,8 @@ private Index(Config cfg) { super(cfg, INDEX_SECTION); threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE); + initialDelayMsec = + getDuration(cfg, INDEX_SECTION, INITIAL_DELAY, DEFAULT_INITIAL_DELAY).toMillis(); batchThreadPoolSize = getInt(cfg, INDEX_SECTION, BATCH_THREAD_POOL_SIZE_KEY, threadPoolSize); retryInterval = getDuration(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL); maxTries = getMaxTries(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES); @@ -652,6 +657,10 @@ return threadPoolSize; } + public long initialDelayMsec() { + return initialDelayMsec; + } + public int batchThreadPoolSize() { return batchThreadPoolSize; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java index 3d3212b..d48df46 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
@@ -17,14 +17,25 @@ import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Provider; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public abstract class ExecutorProvider implements Provider<ScheduledExecutorService>, LifecycleListener { - private ScheduledExecutorService executor; + private ScheduledWithDelayExecutorService executor; - protected ExecutorProvider(WorkQueue workQueue, int threadPoolSize, String threadNamePrefix) { - executor = workQueue.createQueue(threadPoolSize, threadNamePrefix); + protected ExecutorProvider( + WorkQueue workQueue, int threadPoolSize, String threadNamePrefix, long scheduleDelayMsec) { + executor = + new ScheduledWithDelayExecutorService( + workQueue.createQueue(threadPoolSize, threadNamePrefix), scheduleDelayMsec); } @Override @@ -42,4 +53,109 @@ public ScheduledExecutorService get() { return executor; } + + private static class ScheduledWithDelayExecutorService implements ScheduledExecutorService { + private final ScheduledExecutorService executor; + private final long scheduleDelayMsec; + + ScheduledWithDelayExecutorService( + ScheduledExecutorService executorService, long scheduleDelayMsec) { + this.executor = executorService; + this.scheduleDelayMsec = scheduleDelayMsec; + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return executor.schedule( + callable, unit.toMillis(delay) + scheduleDelayMsec, TimeUnit.MILLISECONDS); + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return executor.schedule( + command, unit.toMillis(delay) + scheduleDelayMsec, TimeUnit.MILLISECONDS); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + return executor.invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return executor.invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return executor.invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return executor.invokeAny(tasks, timeout, unit); + } + + @Override + public boolean isShutdown() { + return executor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executor.isTerminated(); + } + + @Override + public void shutdown() { + executor.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return executor.shutdownNow(); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return executor.submit(task); + } + + @Override + public Future<?> submit(Runnable task) { + return executor.submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return executor.submit(task, result); + } + + @Override + public void execute(Runnable command) { + executor.execute(command); + } + } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java index a3a7e64..f82a42d 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
@@ -31,6 +31,9 @@ @Inject ForwardedBatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) { super( - workQueue, config.index().batchThreadPoolSize(), FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX); + workQueue, + config.index().batchThreadPoolSize(), + FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX, + config.index().initialDelayMsec()); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java index da623df..d7dd40f 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
@@ -29,6 +29,10 @@ @Inject ForwardedIndexExecutorProvider(WorkQueue workQueue, Configuration config) { - super(workQueue, config.index().threadPoolSize(), FORWARDED_INDEX_EVENT_THREAD_PREFIX); + super( + workQueue, + config.index().threadPoolSize(), + FORWARDED_INDEX_EVENT_THREAD_PREFIX, + config.index().initialDelayMsec()); } }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md index 5027838..b089091 100644 --- a/src/main/resources/Documentation/config.md +++ b/src/main/resources/Documentation/config.md
@@ -331,6 +331,15 @@ stale and needs to be investigated and manually reindexed. Defaults to 2. +```index.initialDelay``` +: The initial delay, internally converted in milliseconds, of triggering the + indexing operation after the indexing even has been received. + Typically needed when there is a well-known latency of propagation of the updates + across the nodes sharing the same NFS volume. + Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval). + Defaults to 0 milliseconds, meaning that indexing happens immediately when the indexing + event is received. + ```index.retryInterval``` : The interval of time in milliseconds between the subsequent auto-retries. Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).