Merge branch 'stable-3.2'
* stable-3.2:
Update plugin documentation
Avoid multiple indexing operation for a single change
Lock indexing events on forwarder site
Change-Id: I81e61d5f128667c5e1c81ecd0b140922a7ded7f6
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index 58a21b6..3bd58a3 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -45,6 +45,9 @@
public class Configuration {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
+ public static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+ public static final int DEFAULT_TIMEOUT_MS = 5000;
+
// common parameter to peerInfo section
static final String PEER_INFO_SECTION = "peerInfo";
@@ -54,7 +57,6 @@
static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
static final String NUM_STRIPED_LOCKS = "numStripedLocks";
- static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
private final Main main;
private final AutoReindex autoReindex;
@@ -333,6 +335,9 @@
}
public static class Http {
+ public static final int DEFAULT_MAX_TRIES = 360;
+ public static final int DEFAULT_RETRY_INTERVAL = 10000;
+
static final String HTTP_SECTION = "http";
static final String USER_KEY = "user";
static final String PASSWORD_KEY = "password";
@@ -341,10 +346,6 @@
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
- static final int DEFAULT_TIMEOUT_MS = 5000;
- static final int DEFAULT_MAX_TRIES = 360;
- static final int DEFAULT_RETRY_INTERVAL = 10000;
-
private final String user;
private final String password;
private final int connectionTimeout;
@@ -446,6 +447,7 @@
public static class Index extends Forwarding {
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
+ static final String WAIT_TIMEOUT_KEY = "waitTimeout";
static final String RETRY_INTERVAL_KEY = "retryInterval";
static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
@@ -453,6 +455,7 @@
private final int threadPoolSize;
private final int retryInterval;
private final int maxTries;
+ private final int waitTimeout;
private final int numStripedLocks;
private final boolean synchronizeForced;
@@ -462,6 +465,7 @@
numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+ waitTimeout = getInt(cfg, INDEX_SECTION, WAIT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MS);
synchronizeForced =
cfg.getBoolean(INDEX_SECTION, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
@@ -482,6 +486,10 @@
return maxTries;
}
+ public int waitTimeout() {
+ return waitTimeout;
+ }
+
public boolean synchronizeForced() {
return synchronizeForced;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
index 6635b80..8a267d6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
@@ -24,6 +24,7 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.CACHE_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.SYNCHRONIZE_KEY;
@@ -33,7 +34,6 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.CONNECTION_TIMEOUT_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_MAX_TRIES;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_RETRY_INTERVAL;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_TIMEOUT_MS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.HTTP_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.MAX_TRIES_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.PASSWORD_KEY;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
index a6dfc34..b1a595c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
@@ -14,7 +14,6 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.gerrit.entities.Account;
import com.google.gerrit.server.index.account.AccountIndexer;
import com.google.inject.Inject;
@@ -32,8 +31,7 @@
private final AccountIndexer indexer;
@Inject
- ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
- super(config.index());
+ ForwardedIndexAccountHandler(AccountIndexer indexer) {
this.indexer = indexer;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
index 667677d..064783c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -55,7 +55,6 @@
@ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
OneOffRequestContext oneOffCtx,
ChangeCheckerImpl.Factory changeCheckerFactory) {
- super(config.index());
this.indexer = indexer;
this.indexExecutor = indexExecutor;
this.oneOffCtx = oneOffCtx;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
index c0f9330..ab31659 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
@@ -14,7 +14,6 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.server.index.group.GroupIndexer;
import com.google.inject.Inject;
@@ -32,8 +31,7 @@
private final GroupIndexer indexer;
@Inject
- ForwardedIndexGroupHandler(GroupIndexer indexer, Configuration config) {
- super(config.index());
+ ForwardedIndexGroupHandler(GroupIndexer indexer) {
this.indexer = indexer;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
index e837770..20ffefd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
@@ -14,7 +14,6 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.gerrit.entities.Project;
import com.google.gerrit.index.project.ProjectIndexer;
import com.google.inject.Inject;
@@ -32,8 +31,7 @@
private final ProjectIndexer indexer;
@Inject
- ForwardedIndexProjectHandler(ProjectIndexer indexer, Configuration config) {
- super(config.index());
+ ForwardedIndexProjectHandler(ProjectIndexer indexer) {
this.indexer = indexer;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
index 48dfba3..44e35a8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
@@ -14,12 +14,12 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Striped;
import java.io.IOException;
+import java.util.Collections;
import java.util.Optional;
-import java.util.concurrent.locks.Lock;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -29,6 +29,7 @@
*/
public abstract class ForwardedIndexingHandler<T> {
protected static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private final Set<T> inFlightIndexing = Collections.newSetFromMap(new ConcurrentHashMap<>());
public enum Operation {
INDEX,
@@ -40,16 +41,10 @@
}
}
- private final Striped<Lock> idLocks;
-
protected abstract void doIndex(T id, Optional<IndexEvent> indexEvent) throws IOException;
protected abstract void doDelete(T id, Optional<IndexEvent> indexEvent) throws IOException;
- protected ForwardedIndexingHandler(Configuration.Index indexConfig) {
- idLocks = Striped.lock(indexConfig.numStripedLocks());
- }
-
/**
* Index an item in the local node, indexing will not be forwarded to the other node.
*
@@ -60,11 +55,9 @@
*/
public void index(T id, Operation operation, Optional<IndexEvent> indexEvent) throws IOException {
log.atFine().log("%s %s %s", operation, id, indexEvent);
- try {
- Context.setForwardedEvent(true);
- Lock idLock = idLocks.get(id);
- idLock.lock();
+ if (inFlightIndexing.add(id)) {
try {
+ Context.setForwardedEvent(true);
switch (operation) {
case INDEX:
doIndex(id, indexEvent);
@@ -77,10 +70,12 @@
break;
}
} finally {
- idLock.unlock();
+ Context.unsetForwardedEvent();
+ inFlightIndexing.remove(id);
}
- } finally {
- Context.unsetForwardedEvent();
+ } else {
+ throw new InFlightIndexedException(
+ String.format("Indexing for %s %s %s already in flight", operation, id, indexEvent));
}
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java
new file mode 100644
index 0000000..87d28b3
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/InFlightIndexedException.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder;
+
+import java.io.IOException;
+
+public class InFlightIndexedException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InFlightIndexedException(String msg) {
+ super(msg);
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 0697ad7..a352e38 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -14,6 +14,7 @@
package com.ericsson.gerrit.plugins.highavailability.index;
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
@@ -28,7 +29,8 @@
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
class IndexEventHandler
implements ChangeIndexedListener,
@@ -36,25 +38,34 @@
GroupIndexedListener,
ProjectIndexedListener {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
- private final Executor executor;
+ private final ScheduledExecutorService executor;
private final Forwarder forwarder;
private final String pluginName;
private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ChangeCheckerImpl.Factory changeChecker;
private final CurrentRequestContext currCtx;
+ private final IndexEventLocks locks;
+
+ private final int retryInterval;
+ private final int maxTries;
@Inject
IndexEventHandler(
- @IndexExecutor Executor executor,
+ @IndexExecutor ScheduledExecutorService executor,
@PluginName String pluginName,
Forwarder forwarder,
ChangeCheckerImpl.Factory changeChecker,
- CurrentRequestContext currCtx) {
+ CurrentRequestContext currCtx,
+ Configuration cfg,
+ IndexEventLocks locks) {
this.forwarder = forwarder;
this.executor = executor;
this.pluginName = pluginName;
this.changeChecker = changeChecker;
this.currCtx = currCtx;
+ this.locks = locks;
+ this.retryInterval = cfg.http().retryInterval();
+ this.maxTries = cfg.http().maxTries();
}
@Override
@@ -128,6 +139,7 @@
abstract class IndexTask implements Runnable {
protected final IndexEvent indexEvent;
+ private int retryCount = 0;
IndexTask() {
indexEvent = new IndexEvent();
@@ -139,8 +151,22 @@
@Override
public void run() {
- queuedTasks.remove(this);
- execute();
+ locks.withLock(
+ this,
+ () -> {
+ queuedTasks.remove(this);
+ execute();
+ },
+ this::reschedule);
+ }
+
+ private void reschedule() {
+ if (++retryCount <= maxTries) {
+ log.atFine().log("Retrying %d times to %s", retryCount, this);
+ executor.schedule(this, retryInterval, TimeUnit.MILLISECONDS);
+ } else {
+ log.atSevere().log("Failed to %s after %d tries; giving up", this, maxTries);
+ }
}
abstract void execute();
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
new file mode 100644
index 0000000..4617e08
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexTask;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+public class IndexEventLocks {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+ private static final int NUMBER_OF_INDEX_TASK_TYPES = 4;
+
+ private final Striped<Lock> locks;
+ private final long waitTimeout;
+
+ @Inject
+ public IndexEventLocks(Configuration cfg) {
+ this.locks = Striped.lock(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks());
+ this.waitTimeout = cfg.index().waitTimeout();
+ }
+
+ public void withLock(
+ IndexTask id, VoidFunction function, VoidFunction lockAcquireTimeoutCallback) {
+ Lock idLock = getLock(id);
+ try {
+ if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
+ try {
+ function.invoke();
+ } finally {
+ idLock.unlock();
+ }
+ } else {
+ lockAcquireTimeoutCallback.invoke();
+ }
+ } catch (InterruptedException e) {
+ log.atSevere().withCause(e).log("%s was interrupted; giving up", id);
+ }
+ }
+
+ @VisibleForTesting
+ protected Lock getLock(IndexTask id) {
+ return locks.get(id);
+ }
+
+ @FunctionalInterface
+ public interface VoidFunction {
+ void invoke();
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index c17731f..9d0d03d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -20,18 +20,21 @@
import com.google.gerrit.extensions.events.ProjectIndexedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
public class IndexModule extends LifecycleModule {
@Override
protected void configure() {
- bind(Executor.class).annotatedWith(IndexExecutor.class).toProvider(IndexExecutorProvider.class);
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(IndexExecutor.class)
+ .toProvider(IndexExecutorProvider.class);
bind(ScheduledExecutorService.class)
.annotatedWith(ForwardedIndexExecutor.class)
.toProvider(ForwardedIndexExecutorProvider.class);
+ bind(IndexEventLocks.class).in(Scopes.SINGLETON);
listener().to(IndexExecutorProvider.class);
DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 1d1a02b..fd0b56e 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -1,20 +1,19 @@
This plugin allows making Gerrit highly available by having redundant Gerrit
-masters.
+active/passive instances.
-The masters must be:
+The Gerrit instances must be:
-* connecting to the same database
* sharing the git repositories using a shared file system (e.g. NFS)
* behind a load balancer (e.g. HAProxy)
-Currently, the mode supported is one primary (active) master and multiple backup
-(passive) masters but eventually the plan is to support `n` active masters. In
-the active/passive mode, the active master is handling all traffic while the
-passives are kept updated to be always ready to take over.
+Currently, the mode supported is one active instance and multiple backup
+(passive) instances but eventually the plan is to support `n` active instances.
+In the active/passive mode, the active instance is handling all traffic while
+the passives are kept updated to be always ready to take over.
-Even if database and git repositories are shared by the masters, there are a few
-areas of concern in order to be able to switch traffic between masters in a
+Even if git repositories are shared by the instances, there are a few areas
+of concern in order to be able to switch traffic between instances in a
transparent manner from the user's perspective. The 4 areas of concern are
things that Gerrit stores either in memory or locally in the review site:
@@ -23,69 +22,67 @@
* stream-events
* web sessions
-They need either to be shared or kept local to each master but synchronized.
-This plugin needs to be installed in all the masters and it will take care of sharing
-or synchronizing them.
+They need either to be shared or kept local to each instances but synchronized.
+This plugin needs to be installed in all the instances and it will take care of
+sharing or synchronizing them.
#### Caches
-Every time a cache eviction occurs in one of the masters, the eviction will be
-forwarded the other masters so their caches do not contain stale entries.
+Every time a cache eviction occurs in one of the instances, the eviction will be
+forwarded the other nodes so their caches do not contain stale entries.
#### Secondary indexes
-Every time the secondary index is modified in one of the masters, e.g., a change
-is added, updated or removed from the index, the others master's index are
-updated accordingly. This way, both indexes are kept synchronized.
+Every time the secondary index is modified in one of the instances, e.g., a change
+is added, updated or removed from the index, the others instances index are updated
+accordingly. This way, both indexes are kept synchronized.
#### Stream events
-Every time a stream event occurs in one of the masters (see [more events info]
-(https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html#events)),
-the event is forwarded to the other masters which re-plays it. This way, the
-output of the stream-events command is the same, no matter which master a client
-is connected to.
+Every time a stream event occurs in one of the instances
+(see [more events info](https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html#events)),
+the event is forwarded to the other instances which re-plays it. This way, the output
+of the stream-events command is the same, no matter which instance a client is
+connected to.
#### Web session
The built-in Gerrit H2 based web session cache is replaced with a file based
-implementation that is shared amongst the masters.
+implementation that is shared amongst the instances.
## Setup
Prerequisites:
-* Unique database server must be accessible from all the masters
* Git repositories must be located on a shared file system
* A directory on a shared file system must be available for @PLUGIN@ to use
-For the masters:
+For the instances:
-* Configure the database section in gerrit.config to use the shared database
* Configure gerrit.basePath in gerrit.config to the shared repositories location
* Configure gerrit.serverId in gerrit.config based on [config](config.md)'s introduction
* Install and configure this @PLUGIN@ plugin [further](config.md) or based on below.
Here is an example of the minimal @PLUGIN@.config:
-Primary master
+Active instance
```
[main]
- sharedDirectory = /directory/accessible/from/both/masters
+ sharedDirectory = /directory/accessible/from/both/nodes
[peerInfo "static"]
- url = http://backupMasterHost1:8081/
+ url = http://backupNodeHost1:8081/
[http]
user = username
password = password
```
-Backup master
+Backup instance
```
[main]
- sharedDirectory = /directory/accessible/from/both/masters
+ sharedDirectory = /directory/accessible/from/both/nodes
[peerInfo "static"]
- url = http://primaryMasterHost:8080/
+ url = http://primaryNodeHost:8080/
[http]
user = username
@@ -94,10 +91,10 @@
### HA replica site
-It is possible to create a copy of the master site and configure both sites to run
-in HA mode as peers. This is possible when the directory where the copy will be
-created is accessible from this machine. Such a replica site can be created by
-means of a gerrit [site init](../../../Documentation/pgm-init.html) step,
+It is possible to create a copy of the instance site and configure both
+sites to run in HA mode as peers. This is possible when the directory where
+the copy will be created is accessible from this machine. Such a replica site
+can be created by means of a gerrit [site init](../../../Documentation/pgm-init.html) step,
contributed by the plugin under its init section.
This init step is optional but defaults to creating the replica. If you want to
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index c0f2955..cdb7b47 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -55,7 +55,7 @@
```
```main.sharedDirectory```
-: Path to a directory accessible from both master instances.
+: Path to a directory accessible from both instances.
When given as a relative path, then it is resolved against the $SITE_PATH
or Gerrit server. For example, if $SITE_PATH is "/gerrit/root" and
sharedDirectory is given as "shared/dir" then the real path of the shared
@@ -160,8 +160,12 @@
: The interval of time in milliseconds between the subsequent auto-retries.
When not specified, the default value is set to 10000ms.
-NOTE: the default settings for `http.timeout` and `http.maxTries` ensure that
-the plugin will keep retrying to forward a message for one hour.
+```index.waitTimeout```
+: Maximum interval of time in milliseconds the plugin waits to acquire the lock for
+ an indexing call. When not specified, the default value is set to 5000ms.
+
+NOTE: the default settings for `http.timeout`, `http.maxTries` and `http.lockAcquireTimeout`
+ensure that the plugin will keep retrying to forward a message for one hour.
```cache.synchronize```
: Whether to synchronize cache evictions.
@@ -210,6 +214,11 @@
: The interval of time in milliseconds between the subsequent auto-retries.
Defaults to 30000 (30 seconds).
+```index.waitTimeout```
+: Maximum interval of time in milliseconds the plugin waits to acquire
+ the lock for an indexing call. When not specified, the default value
+ is set to 5000ms.
+
```websession.synchronize```
: Whether to synchronize web sessions.
Defaults to true.
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index e28f8c9..a9537bd 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -18,6 +18,7 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_NUM_STRIPED_LOCKS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.SYNCHRONIZE_KEY;
@@ -27,7 +28,6 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.CONNECTION_TIMEOUT_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_MAX_TRIES;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_RETRY_INTERVAL;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_TIMEOUT_MS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.HTTP_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.MAX_TRIES_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.PASSWORD_KEY;
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
index 33ca11c..0e9cee8 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -18,9 +18,7 @@
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
import com.google.gerrit.entities.Account;
import com.google.gerrit.server.index.account.AccountIndexer;
@@ -37,16 +35,12 @@
public class ForwardedIndexAccountHandlerTest {
@Mock private AccountIndexer indexerMock;
- @Mock private Configuration configMock;
- @Mock private Configuration.Index indexMock;
private ForwardedIndexAccountHandler handler;
private Account.Id id;
@Before
public void setUp() throws Exception {
- when(configMock.index()).thenReturn(indexMock);
- when(indexMock.numStripedLocks()).thenReturn(10);
- handler = new ForwardedIndexAccountHandler(indexerMock, configMock);
+ handler = new ForwardedIndexAccountHandler(indexerMock);
id = Account.id(123);
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
index 9d852f2..7c1e6df 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -70,7 +70,6 @@
Change change = new Change(null, id, null, null, TimeUtil.nowTs());
when(changeNotes.getChange()).thenReturn(change);
when(configMock.index()).thenReturn(indexMock);
- when(indexMock.numStripedLocks()).thenReturn(10);
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
handler =
new ForwardedIndexChangeHandler(
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
index b4f887c..5fdb151 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -18,9 +18,7 @@
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.server.index.group.GroupIndexer;
@@ -37,16 +35,12 @@
public class ForwardedIndexGroupHandlerTest {
@Mock private GroupIndexer indexerMock;
- @Mock private Configuration configMock;
- @Mock private Configuration.Index indexMock;
private ForwardedIndexGroupHandler handler;
private AccountGroup.UUID uuid;
@Before
public void setUp() throws Exception {
- when(configMock.index()).thenReturn(indexMock);
- when(indexMock.numStripedLocks()).thenReturn(10);
- handler = new ForwardedIndexGroupHandler(indexerMock, configMock);
+ handler = new ForwardedIndexGroupHandler(indexerMock);
uuid = AccountGroup.uuid("123");
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
index 5e05fd6..0c896b3 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -18,9 +18,7 @@
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
import com.google.gerrit.entities.Project;
import com.google.gerrit.index.project.ProjectIndexer;
@@ -37,16 +35,12 @@
public class ForwardedIndexProjectHandlerTest {
@Mock private ProjectIndexer indexerMock;
- @Mock private Configuration configMock;
- @Mock private Configuration.Index indexMock;
private ForwardedIndexProjectHandler handler;
private Project.NameKey nameKey;
@Before
public void setUp() {
- when(configMock.index()).thenReturn(indexMock);
- when(indexMock.numStripedLocks()).thenReturn(10);
- handler = new ForwardedIndexProjectHandler(indexerMock, configMock);
+ handler = new ForwardedIndexProjectHandler(indexerMock);
nameKey = Project.nameKey("project/name");
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index eff8185..9d8f015 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -32,20 +32,31 @@
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexProjectTask;
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.entities.Change;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import java.util.Collection;
+import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
@@ -64,7 +75,10 @@
private Change.Id changeId;
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
+ private ScheduledExecutorService executor = new CurrentThreadScheduledExecutorService();
@Mock private RequestContext mockCtx;
+ @Mock private Configuration configuration;
+ private IndexEventLocks idLocks;
private CurrentRequestContext currCtx =
new CurrentRequestContext(null, null, null) {
@@ -82,17 +96,41 @@
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
+ Configuration.Index cfgIndex = mock(Configuration.Index.class);
+ when(configuration.index()).thenReturn(cfgIndex);
+ when(cfgIndex.numStripedLocks()).thenReturn(Configuration.DEFAULT_NUM_STRIPED_LOCKS);
+ when(cfgIndex.waitTimeout()).thenReturn(Configuration.DEFAULT_TIMEOUT_MS);
+
+ Configuration.Http http = mock(Configuration.Http.class);
+ when(configuration.http()).thenReturn(http);
+ when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
+ when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
+
+ idLocks = new IndexEventLocks(configuration);
setUpIndexEventHandler(currCtx);
}
public void setUpIndexEventHandler(CurrentRequestContext currCtx) throws Exception {
+ setUpIndexEventHandler(currCtx, idLocks, configuration);
+ }
+
+ public void setUpIndexEventHandler(CurrentRequestContext currCtx, IndexEventLocks idLocks)
+ throws Exception {
+ setUpIndexEventHandler(currCtx, idLocks, configuration);
+ }
+
+ public void setUpIndexEventHandler(
+ CurrentRequestContext currCtx, IndexEventLocks idLocks, Configuration configuration)
+ throws Exception {
indexEventHandler =
new IndexEventHandler(
- MoreExecutors.directExecutor(),
+ executor,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
- currCtx);
+ currCtx,
+ configuration,
+ idLocks);
}
@Test
@@ -115,6 +153,153 @@
}
@Test
+ public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+ verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexAccountTask.class))).thenReturn(lock);
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onAccountIndexed(accountId.get());
+
+ verify(forwarder, never()).indexAccount(eq(ACCOUNT_ID), any());
+ }
+
+ @Test
+ public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(DeleteChangeTask.class))).thenReturn(lock);
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onChangeDeleted(changeId.get());
+
+ verify(forwarder, never()).deleteChangeFromIndex(eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexGroupTask.class))).thenReturn(lock);
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onGroupIndexed(accountGroupUUID.get());
+
+ verify(forwarder, never()).indexGroup(eq(UUID), any());
+ }
+
+ @Test
+ public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexProjectTask.class))).thenReturn(lock);
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onProjectIndexed(PROJECT_NAME);
+
+ verify(forwarder, never()).indexProject(eq(PROJECT_NAME), any());
+ }
+
+ @Test
+ public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false, true);
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+ verify(locks, times(2)).withLock(any(), any(), any());
+ verify(forwarder, times(1)).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+
+ Configuration cfg = mock(Configuration.class);
+ Configuration.Http httpCfg = mock(Configuration.Http.class);
+ when(httpCfg.maxTries()).thenReturn(10);
+ when(cfg.http()).thenReturn(httpCfg);
+ setUpIndexEventHandler(currCtx, locks, cfg);
+
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+ verify(locks, times(11)).withLock(any(), any(), any());
+ verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock lock = mock(Lock.class);
+ when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+
+ Configuration cfg = mock(Configuration.class);
+ Configuration.Http httpCfg = mock(Configuration.Http.class);
+ when(httpCfg.maxTries()).thenReturn(0);
+ when(cfg.http()).thenReturn(httpCfg);
+ setUpIndexEventHandler(currCtx, locks, cfg);
+
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+ verify(locks, times(1)).withLock(any(), any(), any());
+ verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ }
+
+ @Test
+ public void shouldLockPerIndexEventType() throws Exception {
+ IndexEventLocks locks = mock(IndexEventLocks.class);
+ Lock indexChangeLock = mock(Lock.class);
+ when(indexChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
+ Lock accountChangeLock = mock(Lock.class);
+ when(accountChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(true);
+ when(locks.getLock(any(IndexChangeTask.class))).thenReturn(indexChangeLock);
+ when(locks.getLock(any(IndexAccountTask.class))).thenReturn(accountChangeLock);
+ Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+ setUpIndexEventHandler(currCtx, locks);
+
+ indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+ indexEventHandler.onAccountIndexed(accountId.get());
+
+ verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+ verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
+ }
+
+ @Test
public void shouldReindexInRemoteWhenContextIsMissingButForcedIndexingEnabled() throws Exception {
ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
@@ -179,7 +364,14 @@
public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+ new IndexEventHandler(
+ poolMock,
+ PLUGIN_NAME,
+ forwarder,
+ changeCheckerFactoryMock,
+ currCtx,
+ configuration,
+ idLocks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(poolMock, times(1))
@@ -190,7 +382,14 @@
public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+ new IndexEventHandler(
+ poolMock,
+ PLUGIN_NAME,
+ forwarder,
+ changeCheckerFactoryMock,
+ currCtx,
+ configuration,
+ idLocks);
indexEventHandler.onAccountIndexed(accountId.get());
indexEventHandler.onAccountIndexed(accountId.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
@@ -200,7 +399,14 @@
public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
- new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+ new IndexEventHandler(
+ poolMock,
+ PLUGIN_NAME,
+ forwarder,
+ changeCheckerFactoryMock,
+ currCtx,
+ configuration,
+ idLocks);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
@@ -316,4 +522,98 @@
assertThat(task.equals(differentGroupIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentGroupIdTask.hashCode());
}
+
+ private class CurrentThreadScheduledExecutorService implements ScheduledExecutorService {
+
+ @Override
+ public void shutdown() {}
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return null;
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+ command.run();
+ return null;
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return null;
+ }
+ }
}