Merge branch 'stable-3.8' into stable-3.9
* stable-3.8:
Add missing binding of ProjectDeletedListener
Fix the Docker-based setup for the HA test environment
Verify high-availability formatting using GJF 1.7
Also revert the GJF 1.7 change and leave the default formatting version
from Gerrit and fix the formatting issues.
Change-Id: I1858c0f059263fee9bb80206d846fb2235b754ac
diff --git a/BUILD b/BUILD
index 01ed84f..9bb0445 100644
--- a/BUILD
+++ b/BUILD
@@ -22,6 +22,7 @@
deps = [
"@jgroups//jar",
"@jgroups-kubernetes//jar",
+ "@failsafe//jar",
":global-refdb-neverlink",
],
)
@@ -55,6 +56,7 @@
"@global-refdb//jar",
"@wiremock//jar",
"@jgroups//jar",
- "@commons-net//jar"
+ "@commons-net//jar",
+ "@failsafe//jar",
],
)
diff --git a/Jenkinsfile b/Jenkinsfile
index 503cf93..98183a5 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -1,4 +1,3 @@
pluginPipeline(formatCheckId: 'gerritforge:plugins-high-availability-code-style',
buildCheckId: 'gerritforge:plugins-high-availability-build-test',
- gjfVersion: '1.7',
extraModules: ['global-refdb'])
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 09f34de..02c1df5 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -24,3 +24,9 @@
artifact = "com.gerritforge:global-refdb:3.7.4",
sha1 = "a5f3fcdbc04b7e98c52ecd50d2a56424e60b0575",
)
+
+ maven_jar(
+ name = "failsafe",
+ artifact = "dev.failsafe:failsafe:3.3.2",
+ sha1 = "738a986f1f0e4b6c6a49d351dddc772d1378c5a8",
+ )
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index fd5f2cb..55f7375 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -14,7 +14,6 @@
package com.ericsson.gerrit.plugins.highavailability;
-import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.gerritforge.gerrit.globalrefdb.validation.SharedRefDbConfiguration;
@@ -33,13 +32,13 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
@@ -50,8 +49,7 @@
public class Configuration {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
- public static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
- public static final int DEFAULT_TIMEOUT_MS = 5000;
+ public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(5);
public static final String PLUGIN_NAME = "high-availability";
public static final String PLUGIN_CONFIG_FILE = PLUGIN_NAME + ".config";
@@ -64,10 +62,7 @@
// common parameters to cache and index sections
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
static final String BATCH_THREAD_POOL_SIZE_KEY = "batchThreadPoolSize";
- static final int DEFAULT_INDEX_MAX_TRIES = 2;
- static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
- static final String NUM_STRIPED_LOCKS = "numStripedLocks";
private final Main main;
private final AutoReindex autoReindex;
@@ -89,6 +84,11 @@
STATIC
}
+ public enum Transport {
+ HTTP,
+ JGROUPS
+ }
+
@Inject
Configuration(SitePaths sitePaths) {
this(getConfigFile(sitePaths, PLUGIN_CONFIG_FILE), sitePaths);
@@ -199,14 +199,29 @@
}
}
+ private static int getMaxTries(Config cfg, String section, String name, int defaultValue) {
+ int v = getInt(cfg, section, name, defaultValue);
+ return 1 <= v ? v : defaultValue;
+ }
+
+ private static Duration getDuration(
+ Config cfg, String section, String setting, Duration defaultValue) {
+ return Duration.ofMillis(
+ ConfigUtil.getTimeUnit(cfg, section, null, setting, defaultValue.toMillis(), MILLISECONDS));
+ }
+
public static class Main {
static final String MAIN_SECTION = "main";
static final String SHARED_DIRECTORY_KEY = "sharedDirectory";
static final String DEFAULT_SHARED_DIRECTORY = "shared";
+ static final String TRANSPORT_KEY = "transport";
+ static final Transport DEFAULT_TRANSPORT = Transport.HTTP;
+ private final Transport transport;
private final Path sharedDirectory;
private Main(SitePaths site, Config cfg) {
+ transport = cfg.getEnum(MAIN_SECTION, null, TRANSPORT_KEY, DEFAULT_TRANSPORT);
String shared = Strings.emptyToNull(cfg.getString(MAIN_SECTION, null, SHARED_DIRECTORY_KEY));
if (shared == null) {
shared = DEFAULT_SHARED_DIRECTORY;
@@ -219,6 +234,10 @@
}
}
+ public Transport transport() {
+ return transport;
+ }
+
public Path sharedDirectory() {
return sharedDirectory;
}
@@ -231,38 +250,29 @@
static final String DELAY = "delay";
static final String POLL_INTERVAL = "pollInterval";
static final boolean DEFAULT_AUTO_REINDEX = false;
- static final long DEFAULT_DELAY = 10L;
- static final long DEFAULT_POLL_INTERVAL = 0L;
+ static final Duration DEFAULT_DELAY = Duration.ofSeconds(10);
+ static final Duration DEFAULT_POLL_INTERVAL = Duration.ZERO;
private final boolean enabled;
- private final long delaySec;
- private final long pollSec;
+ private final Duration delay;
+ private final Duration pollInterval;
public AutoReindex(Config cfg) {
enabled = cfg.getBoolean(AUTO_REINDEX_SECTION, ENABLED, DEFAULT_AUTO_REINDEX);
- delaySec =
- ConfigUtil.getTimeUnit(
- cfg, AUTO_REINDEX_SECTION, null, DELAY, DEFAULT_DELAY, TimeUnit.SECONDS);
- pollSec =
- ConfigUtil.getTimeUnit(
- cfg,
- AUTO_REINDEX_SECTION,
- null,
- POLL_INTERVAL,
- DEFAULT_POLL_INTERVAL,
- TimeUnit.SECONDS);
+ delay = getDuration(cfg, AUTO_REINDEX_SECTION, DELAY, DEFAULT_DELAY);
+ pollInterval = getDuration(cfg, AUTO_REINDEX_SECTION, POLL_INTERVAL, DEFAULT_POLL_INTERVAL);
}
public boolean enabled() {
return enabled;
}
- public long delaySec() {
- return delaySec;
+ public Duration delay() {
+ return delay;
}
- public long pollSec() {
- return pollSec;
+ public Duration pollInterval() {
+ return pollInterval;
}
}
@@ -325,8 +335,14 @@
}
public static class JGroups {
+ static final int DEFAULT_MAX_TRIES = 720;
+ static final Duration DEFAULT_RETRY_INTERVAL = Duration.ofSeconds(10);
+
static final String SKIP_INTERFACE_KEY = "skipInterface";
static final String CLUSTER_NAME_KEY = "clusterName";
+ static final String MAX_TRIES_KEY = "maxTries";
+ static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String TIMEOUT_KEY = "timeout";
static final String KUBERNETES_KEY = "kubernetes";
static final String PROTOCOL_STACK_KEY = "protocolStack";
static final ImmutableList<String> DEFAULT_SKIP_INTERFACE_LIST =
@@ -335,6 +351,10 @@
private final ImmutableList<String> skipInterface;
private final String clusterName;
+ private final Duration timeout;
+ private final int maxTries;
+ private final Duration retryInterval;
+ private final int threadPoolSize;
private final boolean useKubernetes;
private final Optional<Path> protocolStack;
@@ -344,6 +364,10 @@
log.atFine().log("Skip interface(s): %s", skipInterface);
clusterName = getString(cfg, JGROUPS_SECTION, null, CLUSTER_NAME_KEY, DEFAULT_CLUSTER_NAME);
log.atFine().log("Cluster name: %s", clusterName);
+ timeout = getDuration(cfg, JGROUPS_SECTION, TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ maxTries = getMaxTries(cfg, JGROUPS_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES);
+ retryInterval = getDuration(cfg, JGROUPS_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL);
+ threadPoolSize = getInt(cfg, JGROUPS_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
useKubernetes = cfg.getBoolean(JGROUPS_SECTION, KUBERNETES_KEY, false);
protocolStack = getProtocolStack(cfg, site);
log.atFine().log(
@@ -374,6 +398,22 @@
return clusterName;
}
+ public Duration timeout() {
+ return timeout;
+ }
+
+ public int maxTries() {
+ return maxTries;
+ }
+
+ public Duration retryInterval() {
+ return retryInterval;
+ }
+
+ public int threadPoolSize() {
+ return threadPoolSize;
+ }
+
public boolean useKubernetes() {
return useKubernetes;
}
@@ -403,7 +443,8 @@
public static class Http {
public static final int DEFAULT_MAX_TRIES = 360;
- public static final int DEFAULT_RETRY_INTERVAL = 10000;
+ public static final Duration DEFAULT_RETRY_INTERVAL = Duration.ofSeconds(10);
+ public static final int DEFAULT_THREAD_POOL_SIZE = 4;
static final String HTTP_SECTION = "http";
static final String USER_KEY = "user";
@@ -412,21 +453,24 @@
static final String SOCKET_TIMEOUT_KEY = "socketTimeout";
static final String MAX_TRIES_KEY = "maxTries";
static final String RETRY_INTERVAL_KEY = "retryInterval";
+ static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
private final String user;
private final String password;
- private final int connectionTimeout;
- private final int socketTimeout;
+ private final Duration connectionTimeout;
+ private final Duration socketTimeout;
private final int maxTries;
- private final int retryInterval;
+ private final Duration retryInterval;
+ private final int threadPoolSize;
private Http(Config cfg) {
user = Strings.nullToEmpty(cfg.getString(HTTP_SECTION, null, USER_KEY));
password = Strings.nullToEmpty(cfg.getString(HTTP_SECTION, null, PASSWORD_KEY));
- connectionTimeout = getInt(cfg, HTTP_SECTION, CONNECTION_TIMEOUT_KEY, DEFAULT_TIMEOUT_MS);
- socketTimeout = getInt(cfg, HTTP_SECTION, SOCKET_TIMEOUT_KEY, DEFAULT_TIMEOUT_MS);
- maxTries = getInt(cfg, HTTP_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES);
- retryInterval = getInt(cfg, HTTP_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL);
+ connectionTimeout = getDuration(cfg, HTTP_SECTION, CONNECTION_TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ socketTimeout = getDuration(cfg, HTTP_SECTION, SOCKET_TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ maxTries = getMaxTries(cfg, HTTP_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES);
+ retryInterval = getDuration(cfg, HTTP_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL);
+ threadPoolSize = getInt(cfg, HTTP_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
}
public String user() {
@@ -437,11 +481,11 @@
return password;
}
- public int connectionTimeout() {
+ public Duration connectionTimeout() {
return connectionTimeout;
}
- public int socketTimeout() {
+ public Duration socketTimeout() {
return socketTimeout;
}
@@ -449,9 +493,13 @@
return maxTries;
}
- public int retryInterval() {
+ public Duration retryInterval() {
return retryInterval;
}
+
+ public int threadPoolSize() {
+ return threadPoolSize;
+ }
}
/** Common parameters to cache, event, index and websession */
@@ -485,19 +533,13 @@
static final String CACHE_SECTION = "cache";
static final String PATTERN_KEY = "pattern";
- private final int threadPoolSize;
private final List<String> patterns;
private Cache(Config cfg) {
super(cfg, CACHE_SECTION);
- threadPoolSize = getInt(cfg, CACHE_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
patterns = Arrays.asList(cfg.getStringList(CACHE_SECTION, null, PATTERN_KEY));
}
- public int threadPoolSize() {
- return threadPoolSize;
- }
-
public List<String> patterns() {
return Collections.unmodifiableList(patterns);
}
@@ -521,27 +563,27 @@
}
public static class Index extends Forwarding {
+ static final int DEFAULT_MAX_TRIES = 2;
+ static final Duration DEFAULT_RETRY_INTERVAL = Duration.ofSeconds(30);
+
static final String INDEX_SECTION = "index";
static final String MAX_TRIES_KEY = "maxTries";
- static final String WAIT_TIMEOUT_KEY = "waitTimeout";
static final String RETRY_INTERVAL_KEY = "retryInterval";
static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
private final int batchThreadPoolSize;
- private final int retryInterval;
+ private final Duration retryInterval;
private final int maxTries;
- private final int numStripedLocks;
private final boolean synchronizeForced;
private Index(Config cfg) {
super(cfg, INDEX_SECTION);
threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
batchThreadPoolSize = getInt(cfg, INDEX_SECTION, BATCH_THREAD_POOL_SIZE_KEY, threadPoolSize);
- numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
- retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
- maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+ retryInterval = getDuration(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_RETRY_INTERVAL);
+ maxTries = getMaxTries(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_MAX_TRIES);
synchronizeForced =
cfg.getBoolean(INDEX_SECTION, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
}
@@ -554,11 +596,7 @@
return batchThreadPoolSize;
}
- public int numStripedLocks() {
- return numStripedLocks;
- }
-
- public int retryInterval() {
+ public Duration retryInterval() {
return retryInterval;
}
@@ -574,21 +612,18 @@
public static class Websession extends Forwarding {
static final String WEBSESSION_SECTION = "websession";
static final String CLEANUP_INTERVAL_KEY = "cleanupInterval";
- static final String DEFAULT_CLEANUP_INTERVAL = "24 hours";
- static final long DEFAULT_CLEANUP_INTERVAL_MS = HOURS.toMillis(24);
+ static final String DEFAULT_CLEANUP_INTERVAL_AS_STRING = "24 hours";
+ static final Duration DEFAULT_CLEANUP_INTERVAL = Duration.ofHours(24);
- private final long cleanupInterval;
+ private final Duration cleanupInterval;
private Websession(Config cfg) {
super(cfg, WEBSESSION_SECTION);
cleanupInterval =
- ConfigUtil.getTimeUnit(
- Strings.nullToEmpty(cfg.getString(WEBSESSION_SECTION, null, CLEANUP_INTERVAL_KEY)),
- DEFAULT_CLEANUP_INTERVAL_MS,
- MILLISECONDS);
+ getDuration(cfg, WEBSESSION_SECTION, CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL);
}
- public long cleanupInterval() {
+ public Duration cleanupInterval() {
return cleanupInterval;
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Module.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Module.java
index 26bdd5d..170a79a 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Module.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Module.java
@@ -18,6 +18,7 @@
import com.ericsson.gerrit.plugins.highavailability.cache.CacheModule;
import com.ericsson.gerrit.plugins.highavailability.event.EventModule;
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderModule;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups.JGroupsForwarderModule;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.RestForwarderModule;
import com.ericsson.gerrit.plugins.highavailability.index.IndexModule;
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfoModule;
@@ -44,7 +45,18 @@
protected void configure() {
install(new EnvModule());
install(new ForwarderModule());
- install(new RestForwarderModule());
+
+ switch (config.main().transport()) {
+ case HTTP:
+ install(new RestForwarderModule());
+ install(new PeerInfoModule(config.peerInfo().strategy()));
+ break;
+ case JGROUPS:
+ install(new JGroupsForwarderModule());
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported transport: " + config.main().transport());
+ }
if (config.cache().synchronize()) {
install(new CacheModule());
@@ -58,7 +70,6 @@
if (config.autoReindex().enabled()) {
install(new AutoReindexModule());
}
- install(new PeerInfoModule(config.peerInfo().strategy()));
if (config.sharedRefDb().getSharedRefDb().isEnabled()) {
listener().to(PluginStartup.class);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
index 8a267d6..a089b23 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
@@ -24,7 +24,7 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.CACHE_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.SYNCHRONIZE_KEY;
@@ -55,7 +55,7 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfoStatic.URL_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.THREAD_POOL_SIZE_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.CLEANUP_INTERVAL_KEY;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.DEFAULT_CLEANUP_INTERVAL;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.DEFAULT_CLEANUP_INTERVAL_AS_STRING;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.WEBSESSION_SECTION;
import com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfoStrategy;
@@ -135,7 +135,8 @@
config.setBoolean(AUTO_REINDEX_SECTION, null, ENABLED, autoReindex);
String delay =
- promptAndSetString("Delay", AUTO_REINDEX_SECTION, DELAY, numberToString(DEFAULT_DELAY));
+ promptAndSetString(
+ "Delay", AUTO_REINDEX_SECTION, DELAY, numberToString(DEFAULT_DELAY.toMillis()));
config.setLong(AUTO_REINDEX_SECTION, null, DELAY, Long.valueOf(delay));
String pollInterval =
@@ -143,7 +144,7 @@
"Poll interval",
AUTO_REINDEX_SECTION,
POLL_INTERVAL,
- numberToString(DEFAULT_POLL_INTERVAL));
+ numberToString(DEFAULT_POLL_INTERVAL.toMillis()));
config.setLong(AUTO_REINDEX_SECTION, null, POLL_INTERVAL, Long.valueOf(pollInterval));
}
@@ -203,17 +204,17 @@
"Retry interval [ms]",
HTTP_SECTION,
RETRY_INTERVAL_KEY,
- numberToString(DEFAULT_RETRY_INTERVAL));
+ numberToString(DEFAULT_RETRY_INTERVAL.toMillis()));
promptAndSetString(
"Connection timeout [ms]",
HTTP_SECTION,
CONNECTION_TIMEOUT_KEY,
- numberToString(DEFAULT_TIMEOUT_MS));
+ numberToString(DEFAULT_TIMEOUT.toMillis()));
promptAndSetString(
"Socket timeout [ms]",
HTTP_SECTION,
SOCKET_TIMEOUT_KEY,
- numberToString(DEFAULT_TIMEOUT_MS));
+ numberToString(DEFAULT_TIMEOUT.toMillis()));
}
private void configureCacheSection() {
@@ -247,7 +248,10 @@
ui.header("Websession section");
promptAndSetSynchronize("Websession", WEBSESSION_SECTION);
promptAndSetString(
- "Cleanup interval", WEBSESSION_SECTION, CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL);
+ "Cleanup interval",
+ WEBSESSION_SECTION,
+ CLEANUP_INTERVAL_KEY,
+ DEFAULT_CLEANUP_INTERVAL_AS_STRING);
}
private void configureHealthCheckSection() {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/AutoReindexScheduler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/AutoReindexScheduler.java
index ef7ae86..e948ba4 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/AutoReindexScheduler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/AutoReindexScheduler.java
@@ -20,6 +20,7 @@
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
@@ -55,27 +56,24 @@
@Override
public void start() {
- if (cfg.pollSec() > 0) {
+ if (cfg.pollInterval().compareTo(Duration.ZERO) > 0) {
log.atInfo().log(
- "Scheduling auto-reindex after %ds and every %ds", cfg.delaySec(), cfg.pollSec());
- futureTasks.add(
- executor.scheduleAtFixedRate(
- changeReindex, cfg.delaySec(), cfg.pollSec(), TimeUnit.SECONDS));
- futureTasks.add(
- executor.scheduleAtFixedRate(
- accountReindex, cfg.delaySec(), cfg.pollSec(), TimeUnit.SECONDS));
- futureTasks.add(
- executor.scheduleAtFixedRate(
- groupReindex, cfg.delaySec(), cfg.pollSec(), TimeUnit.SECONDS));
- futureTasks.add(
- executor.scheduleAtFixedRate(
- projectReindex, cfg.delaySec(), cfg.pollSec(), TimeUnit.SECONDS));
+ "Scheduling auto-reindex after %s and every %s", cfg.delay(), cfg.pollInterval());
+ for (Runnable reindexTask :
+ List.of(changeReindex, accountReindex, groupReindex, projectReindex)) {
+ futureTasks.add(
+ executor.scheduleAtFixedRate(
+ reindexTask,
+ cfg.delay().toSeconds(),
+ cfg.pollInterval().toSeconds(),
+ TimeUnit.SECONDS));
+ }
} else {
- log.atInfo().log("Scheduling auto-reindex after %ds", cfg.delaySec());
- futureTasks.add(executor.schedule(changeReindex, cfg.delaySec(), TimeUnit.SECONDS));
- futureTasks.add(executor.schedule(accountReindex, cfg.delaySec(), TimeUnit.SECONDS));
- futureTasks.add(executor.schedule(groupReindex, cfg.delaySec(), TimeUnit.SECONDS));
- futureTasks.add(executor.schedule(projectReindex, cfg.delaySec(), TimeUnit.SECONDS));
+ log.atInfo().log("Scheduling auto-reindex after %s", cfg.delay());
+ for (Runnable reindexTask :
+ List.of(changeReindex, accountReindex, groupReindex, projectReindex)) {
+ futureTasks.add(executor.schedule(reindexTask, cfg.delay().toSeconds(), TimeUnit.SECONDS));
+ }
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandler.java
index 6597ac5..53bc3a1 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandler.java
@@ -17,54 +17,23 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.google.common.cache.RemovalNotification;
-import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.inject.Inject;
-import java.util.concurrent.Executor;
class CacheEvictionHandler<K, V> implements CacheRemovalListener<K, V> {
- private final Executor executor;
private final Forwarder forwarder;
- private final String pluginName;
private final CachePatternMatcher matcher;
@Inject
- CacheEvictionHandler(
- Forwarder forwarder,
- @CacheExecutor Executor executor,
- @PluginName String pluginName,
- CachePatternMatcher matcher) {
+ CacheEvictionHandler(Forwarder forwarder, CachePatternMatcher matcher) {
this.forwarder = forwarder;
- this.executor = executor;
- this.pluginName = pluginName;
this.matcher = matcher;
}
@Override
public void onRemoval(String plugin, String cache, RemovalNotification<K, V> notification) {
if (!Context.isForwardedEvent() && !notification.wasEvicted() && matcher.matches(cache)) {
- executor.execute(new CacheEvictionTask(cache, notification.getKey()));
- }
- }
-
- class CacheEvictionTask implements Runnable {
- private final String cacheName;
- private final Object key;
-
- CacheEvictionTask(String cacheName, Object key) {
- this.cacheName = cacheName;
- this.key = key;
- }
-
- @Override
- public void run() {
- forwarder.evict(cacheName, key);
- }
-
- @Override
- public String toString() {
- return String.format(
- "[%s] Evict key '%s' from cache '%s' in target instance", pluginName, key, cacheName);
+ forwarder.evict(cache, notification.getKey());
}
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProvider.java
deleted file mode 100644
index 0872dff..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProvider.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.cache;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-@Singleton
-class CacheExecutorProvider extends ExecutorProvider {
-
- @Inject
- CacheExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.cache().threadPoolSize(), "Forward-Cache-Eviction-Event");
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheModule.java
index 17e2613..aa613e1 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheModule.java
@@ -17,16 +17,13 @@
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.cache.CacheRemovalListener;
-import java.util.concurrent.Executor;
+import com.google.inject.AbstractModule;
-public class CacheModule extends LifecycleModule {
+public class CacheModule extends AbstractModule {
@Override
protected void configure() {
- bind(Executor.class).annotatedWith(CacheExecutor.class).toProvider(CacheExecutorProvider.class);
- listener().to(CacheExecutorProvider.class);
DynamicSet.bind(binder(), CacheRemovalListener.class).to(CacheEvictionHandler.class);
DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ProjectListUpdateHandler.class);
DynamicSet.bind(binder(), ProjectDeletedListener.class).to(ProjectListUpdateHandler.class);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandler.java
index 7205137..9ce9536 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandler.java
@@ -16,27 +16,20 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
-import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.events.ProjectEvent;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.util.concurrent.Executor;
@Singleton
public class ProjectListUpdateHandler implements NewProjectCreatedListener, ProjectDeletedListener {
private final Forwarder forwarder;
- private final Executor executor;
- private final String pluginName;
@Inject
- public ProjectListUpdateHandler(
- Forwarder forwarder, @CacheExecutor Executor executor, @PluginName String pluginName) {
+ public ProjectListUpdateHandler(Forwarder forwarder) {
this.forwarder = forwarder;
- this.executor = executor;
- this.pluginName = pluginName;
}
@Override
@@ -53,33 +46,11 @@
private void process(ProjectEvent event, boolean delete) {
if (!Context.isForwardedEvent()) {
- executor.execute(new ProjectListUpdateTask(event.getProjectName(), delete));
- }
- }
-
- class ProjectListUpdateTask implements Runnable {
- private final String projectName;
- private final boolean delete;
-
- ProjectListUpdateTask(String projectName, boolean delete) {
- this.projectName = projectName;
- this.delete = delete;
- }
-
- @Override
- public void run() {
if (delete) {
- forwarder.removeFromProjectList(projectName);
+ forwarder.removeFromProjectList(event.getProjectName());
} else {
- forwarder.addToProjectList(projectName);
+ forwarder.addToProjectList(event.getProjectName());
}
}
-
- @Override
- public String toString() {
- return String.format(
- "[%s] Update project list in target instance: %s '%s'",
- pluginName, delete ? "remove" : "add", projectName);
- }
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutor.java
deleted file mode 100644
index 726484c..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.event;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface EventExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProvider.java
deleted file mode 100644
index b39eca2..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.event;
-
-import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-@Singleton
-class EventExecutorProvider extends ExecutorProvider {
-
- @Inject
- EventExecutorProvider(WorkQueue workQueue) {
- super(workQueue, 1, "Forward-Stream-Event");
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandler.java
index 025115c..5ce12df 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandler.java
@@ -16,48 +16,23 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
-import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
import com.google.gerrit.server.events.ProjectEvent;
import com.google.inject.Inject;
-import java.util.concurrent.Executor;
class EventHandler implements EventListener {
- private final Executor executor;
private final Forwarder forwarder;
- private final String pluginName;
@Inject
- EventHandler(
- Forwarder forwarder, @EventExecutor Executor executor, @PluginName String pluginName) {
+ EventHandler(Forwarder forwarder) {
this.forwarder = forwarder;
- this.executor = executor;
- this.pluginName = pluginName;
}
@Override
public void onEvent(Event event) {
if (!Context.isForwardedEvent() && event instanceof ProjectEvent) {
- executor.execute(new EventTask(event));
- }
- }
-
- class EventTask implements Runnable {
- private final Event event;
-
- EventTask(Event event) {
- this.event = event;
- }
-
- @Override
- public void run() {
forwarder.send(event);
}
-
- @Override
- public String toString() {
- return String.format("[%s] Send event '%s' to target instance", pluginName, event.type);
- }
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventModule.java
index 28a9e8e..fe67f41 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/event/EventModule.java
@@ -17,14 +17,11 @@
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.events.EventListener;
-import java.util.concurrent.Executor;
public class EventModule extends LifecycleModule {
@Override
protected void configure() {
- bind(Executor.class).annotatedWith(EventExecutor.class).toProvider(EventExecutorProvider.class);
- listener().to(EventExecutorProvider.class);
DynamicSet.bind(binder(), EventListener.class).to(EventHandler.class);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/CacheEntry.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/CacheEntry.java
index ed9898f..6dca538 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/CacheEntry.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/CacheEntry.java
@@ -15,6 +15,7 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
import com.ericsson.gerrit.plugins.highavailability.cache.Constants;
+import java.util.Objects;
/** Represents a cache entry to evict */
public class CacheEntry {
@@ -48,6 +49,22 @@
return key;
}
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof CacheEntry) {
+ CacheEntry e = (CacheEntry) o;
+ return Objects.equals(pluginName, e.pluginName)
+ && Objects.equals(cacheName, e.cacheName)
+ && Objects.equals(key, e.key);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(pluginName, cacheName, key);
+ }
+
/**
* Build a CacheEntry from the specified cache and key
*
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
index b1a595c..b2f8141 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
@@ -19,6 +19,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* Index an account using {@link AccountIndexer}. This class is meant to be used on the receiving
@@ -36,13 +37,14 @@
}
@Override
- protected void doIndex(Account.Id id, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doIndex(Account.Id id, Optional<IndexEvent> indexEvent) {
indexer.index(id);
log.atFine().log("Account %s successfully indexed", id);
+ return CompletableFuture.completedFuture(true);
}
@Override
- protected void doDelete(Account.Id id, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doDelete(Account.Id id, Optional<IndexEvent> indexEvent) {
throw new UnsupportedOperationException("Delete from account index not supported");
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java
index dee8876..bf48f76 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java
@@ -14,14 +14,13 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl.Factory;
import com.ericsson.gerrit.plugins.highavailability.index.ForwardedBatchIndexExecutor;
import com.google.gerrit.server.index.change.ChangeIndexer;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.util.concurrent.ScheduledExecutorService;
+import dev.failsafe.FailsafeExecutor;
@Singleton
public class ForwardedIndexBatchChangeHandler extends ForwardedIndexChangeHandler {
@@ -29,10 +28,9 @@
@Inject
ForwardedIndexBatchChangeHandler(
ChangeIndexer indexer,
- Configuration config,
- @ForwardedBatchIndexExecutor ScheduledExecutorService indexExecutor,
+ @ForwardedBatchIndexExecutor FailsafeExecutor<Boolean> indexExecutor,
OneOffRequestContext oneOffCtx,
Factory changeCheckerFactory) {
- super(indexer, config, indexExecutor, oneOffCtx, changeCheckerFactory);
+ super(indexer, indexExecutor, oneOffCtx, changeCheckerFactory);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
index 9663205..ac102ff 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -14,8 +14,6 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.ericsson.gerrit.plugins.highavailability.Configuration.Index;
import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
import com.ericsson.gerrit.plugins.highavailability.index.ForwardedIndexExecutor;
@@ -30,11 +28,11 @@
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import dev.failsafe.FailsafeExecutor;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
/**
* Index a change using {@link ChangeIndexer}. This class is meant to be used on the receiving side
@@ -45,36 +43,35 @@
@Singleton
public class ForwardedIndexChangeHandler extends ForwardedIndexingHandler<String> {
private final ChangeIndexer indexer;
- private final ScheduledExecutorService indexExecutor;
+ private final FailsafeExecutor<Boolean> indexExecutor;
private final OneOffRequestContext oneOffCtx;
- private final int retryInterval;
- private final int maxTries;
private final ChangeCheckerImpl.Factory changeCheckerFactory;
@Inject
ForwardedIndexChangeHandler(
ChangeIndexer indexer,
- Configuration config,
- @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
+ @ForwardedIndexExecutor FailsafeExecutor<Boolean> indexExecutor,
OneOffRequestContext oneOffCtx,
ChangeCheckerImpl.Factory changeCheckerFactory) {
this.indexer = indexer;
this.indexExecutor = indexExecutor;
this.oneOffCtx = oneOffCtx;
this.changeCheckerFactory = changeCheckerFactory;
-
- Index indexConfig = config.index();
- this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
- this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
}
@Override
- protected void doIndex(String id, Optional<IndexEvent> indexEvent) throws IOException {
- doIndex(id, indexEvent, 0);
+ protected CompletableFuture<Boolean> doIndex(String id, Optional<IndexEvent> indexEvent)
+ throws IOException {
+ return indexExecutor.getAsync(
+ () -> {
+ try (ManualRequestContext ctx = oneOffCtx.open()) {
+ Context.setForwardedEvent(true);
+ return indexOnce(id, indexEvent);
+ }
+ });
}
- private void doIndex(String id, Optional<IndexEvent> indexEvent, int retryCount)
- throws IOException {
+ private boolean indexOnce(String id, Optional<IndexEvent> indexEvent) throws Exception {
try {
ChangeChecker checker = changeCheckerFactory.create(id);
Optional<ChangeNotes> changeNotes;
@@ -89,33 +86,25 @@
reindex(notes);
if (checker.isChangeUpToDate(indexEvent)) {
- if (retryCount > 0) {
- log.atWarning().log(
- "Change %s has been eventually indexed after %d attempt(s)", id, retryCount);
- } else {
- log.atFine().log("Change %s successfully indexed", id);
- }
- } else {
- log.atWarning().log(
- "Change %s seems too old compared to the event timestamp (event-Ts=%s >> change-Ts=%s)",
- id, indexEvent, checker);
- rescheduleIndex(id, indexEvent, retryCount + 1);
+ log.atFine().log("Change %s successfully indexed", id);
+ return true;
}
- } else {
+
log.atWarning().log(
- "Change %s not present yet in local Git repository (event=%s) after %d attempt(s)",
- id, indexEvent, retryCount);
- if (!rescheduleIndex(id, indexEvent, retryCount + 1)) {
- log.atSevere().log(
- "Change %s could not be found in the local Git repository (event=%s)",
- id, indexEvent);
- }
+ "Change %s seems too old compared to the event timestamp (event-Ts=%s >> change-Ts=%s)",
+ id, indexEvent, checker);
+ return false;
}
+
+ log.atWarning().log(
+ "Change %s not present yet in local Git repository (event=%s)", id, indexEvent);
+ return false;
+
} catch (Exception e) {
if (isCausedByNoSuchChangeException(e)) {
indexer.delete(parseChangeId(id));
log.atWarning().withCause(e).log("Error trying to index Change %s. Deleted from index", id);
- return;
+ return true;
}
throw e;
@@ -127,35 +116,12 @@
indexer.index(notes);
}
- private boolean rescheduleIndex(String id, Optional<IndexEvent> indexEvent, int retryCount) {
- if (retryCount > maxTries) {
- log.atSevere().log(
- "Change %s could not be indexed after %d retries. Change index could be stale.",
- id, retryCount);
- return false;
- }
-
- log.atWarning().log(
- "Retrying for the #%d time to index Change %s after %d msecs",
- retryCount, id, retryInterval);
- indexExecutor.schedule(
- () -> {
- try (ManualRequestContext ctx = oneOffCtx.open()) {
- Context.setForwardedEvent(true);
- doIndex(id, indexEvent, retryCount);
- } catch (Exception e) {
- log.atWarning().withCause(e).log("Change %s could not be indexed", id);
- }
- },
- retryInterval,
- TimeUnit.MILLISECONDS);
- return true;
- }
-
@Override
- protected void doDelete(String id, Optional<IndexEvent> indexEvent) throws IOException {
+ protected CompletableFuture<Boolean> doDelete(String id, Optional<IndexEvent> indexEvent)
+ throws IOException {
indexer.delete(parseChangeId(id));
log.atFine().log("Change %s successfully deleted from index", id);
+ return CompletableFuture.completedFuture(true);
}
private static Change.Id parseChangeId(String id) {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
index ab31659..99ac369 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
@@ -19,6 +19,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* Index a group using {@link GroupIndexer}. This class is meant to be used on the receiving side of
@@ -36,13 +37,16 @@
}
@Override
- protected void doIndex(AccountGroup.UUID uuid, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doIndex(
+ AccountGroup.UUID uuid, Optional<IndexEvent> indexEvent) {
indexer.index(uuid);
log.atFine().log("Group %s successfully indexed", uuid);
+ return CompletableFuture.completedFuture(true);
}
@Override
- protected void doDelete(AccountGroup.UUID uuid, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doDelete(
+ AccountGroup.UUID uuid, Optional<IndexEvent> indexEvent) {
throw new UnsupportedOperationException("Delete from group index not supported");
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
index 20ffefd..b16cb6e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
@@ -19,6 +19,7 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* Index a project using {@link ProjectIndexer}. This class is meant to be used on the receiving
@@ -36,13 +37,16 @@
}
@Override
- protected void doIndex(Project.NameKey projectName, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doIndex(
+ Project.NameKey projectName, Optional<IndexEvent> indexEvent) {
indexer.index(projectName);
log.atFine().log("Project %s successfully indexed", projectName);
+ return CompletableFuture.completedFuture(true);
}
@Override
- protected void doDelete(Project.NameKey projectName, Optional<IndexEvent> indexEvent) {
+ protected CompletableFuture<Boolean> doDelete(
+ Project.NameKey projectName, Optional<IndexEvent> indexEvent) {
throw new UnsupportedOperationException("Delete from project index not supported");
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
index 44e35a8..6ce925a 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
@@ -19,6 +19,7 @@
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -41,9 +42,11 @@
}
}
- protected abstract void doIndex(T id, Optional<IndexEvent> indexEvent) throws IOException;
+ protected abstract CompletableFuture<Boolean> doIndex(T id, Optional<IndexEvent> indexEvent)
+ throws IOException;
- protected abstract void doDelete(T id, Optional<IndexEvent> indexEvent) throws IOException;
+ protected abstract CompletableFuture<Boolean> doDelete(T id, Optional<IndexEvent> indexEvent)
+ throws IOException;
/**
* Index an item in the local node, indexing will not be forwarded to the other node.
@@ -53,29 +56,27 @@
* @param indexEvent The index event details.
* @throws IOException If an error occur while indexing.
*/
- public void index(T id, Operation operation, Optional<IndexEvent> indexEvent) throws IOException {
+ public CompletableFuture<Boolean> index(
+ T id, Operation operation, Optional<IndexEvent> indexEvent) throws IOException {
log.atFine().log("%s %s %s", operation, id, indexEvent);
if (inFlightIndexing.add(id)) {
try {
Context.setForwardedEvent(true);
switch (operation) {
case INDEX:
- doIndex(id, indexEvent);
- break;
+ return doIndex(id, indexEvent);
case DELETE:
- doDelete(id, indexEvent);
- break;
+ return doDelete(id, indexEvent);
default:
log.atSevere().log("unexpected operation: %s", operation);
- break;
+ return CompletableFuture.completedFuture(false);
}
} finally {
Context.unsetForwardedEvent();
inFlightIndexing.remove(id);
}
- } else {
- throw new InFlightIndexedException(
- String.format("Indexing for %s %s %s already in flight", operation, id, indexEvent));
}
+ throw new InFlightIndexedException(
+ String.format("Indexing for %s %s %s already in flight", operation, id, indexEvent));
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java
new file mode 100644
index 0000000..f7af2df
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+public class AddToProjectList extends Command {
+ static final String TYPE = "add-to-project-list";
+
+ private final String projectName;
+
+ public AddToProjectList(String projectName) {
+ super(TYPE);
+ this.projectName = projectName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java
similarity index 62%
copy from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
copy to src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java
index 9b33ca3..a258d91 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+public class Command {
+ public final String type;
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface CacheExecutor {}
+ protected Command(String type) {
+ this.type = type;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java
new file mode 100644
index 0000000..b75f409
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java
@@ -0,0 +1,72 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CommandDeserializer implements JsonDeserializer<Command> {
+
+ private static final List<Class<? extends Command>> CMD_CLASSES =
+ List.of(
+ IndexChange.Update.class,
+ IndexChange.Delete.class,
+ IndexAccount.class,
+ IndexGroup.class,
+ IndexProject.class,
+ EvictCache.class,
+ PostEvent.class,
+ AddToProjectList.class,
+ RemoveFromProjectList.class);
+ private static final Map<String, Class<?>> COMMAND_TYPE_TO_CLASS_MAPPING = new HashMap<>();
+
+ static {
+ for (Class<?> clazz : CMD_CLASSES) {
+ try {
+ Field type = clazz.getDeclaredField("TYPE");
+ COMMAND_TYPE_TO_CLASS_MAPPING.put((String) type.get(null), clazz);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Command deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
+ throws JsonParseException {
+ if (!json.isJsonObject()) {
+ throw new JsonParseException("Not an object");
+ }
+ JsonElement typeJson = json.getAsJsonObject().get("type");
+ if (typeJson == null
+ || !typeJson.isJsonPrimitive()
+ || !typeJson.getAsJsonPrimitive().isString()) {
+ throw new JsonParseException("Type is not a string: " + typeJson);
+ }
+ String type = typeJson.getAsJsonPrimitive().getAsString();
+ Class<?> commandClass = COMMAND_TYPE_TO_CLASS_MAPPING.get(type);
+ if (commandClass == null) {
+ throw new JsonParseException("Unknown command type: " + type);
+ }
+ return context.deserialize(json, commandClass);
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java
new file mode 100644
index 0000000..c4ae6d0
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java
@@ -0,0 +1,36 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+public class EvictCache extends Command {
+ static final String TYPE = "evict-cache";
+
+ private final String cacheName;
+ private final String keyJson;
+
+ protected EvictCache(String cacheName, String keyJson) {
+ super(TYPE);
+ this.cacheName = cacheName;
+ this.keyJson = keyJson;
+ }
+
+ public String getCacheName() {
+ return cacheName;
+ }
+
+ public String getKeyJson() {
+ return keyJson;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java
new file mode 100644
index 0000000..8f81832
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java
@@ -0,0 +1,53 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeExecutor;
+import dev.failsafe.RetryPolicy;
+
+public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Boolean>> {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private final Configuration cfg;
+ private final WorkQueue workQueue;
+
+ @Inject
+ FailsafeExecutorProvider(Configuration cfg, WorkQueue workQueue) {
+ this.cfg = cfg;
+ this.workQueue = workQueue;
+ }
+
+ @Override
+ public FailsafeExecutor<Boolean> get() {
+ RetryPolicy<Boolean> retryPolicy =
+ RetryPolicy.<Boolean>builder()
+ .withMaxAttempts(cfg.jgroups().maxTries())
+ .withDelay(cfg.jgroups().retryInterval())
+ .onRetry(e -> log.atFine().log("Retrying event %s", e))
+ .onRetriesExceeded(
+ e ->
+ log.atWarning().log(
+ "%d jgroups retries exceeded for event %s", cfg.jgroups().maxTries(), e))
+ .handleResult(false)
+ .build();
+ return Failsafe.with(retryPolicy)
+ .with(workQueue.createQueue(cfg.jgroups().threadPoolSize(), "JGroupsForwarder"));
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java
similarity index 60%
copy from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
copy to src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java
index 9b33ca3..7d9bbc4 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+public class IndexAccount extends Command {
+ static final String TYPE = "index-account";
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
+ private final int id;
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface CacheExecutor {}
+ public IndexAccount(int id) {
+ super(TYPE);
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java
new file mode 100644
index 0000000..501a722
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java
@@ -0,0 +1,62 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.google.common.base.Strings;
+
+public abstract class IndexChange extends Command {
+ private final String projectName;
+ private final int id;
+ private final boolean batchMode;
+
+ protected IndexChange(String type, String projectName, int id, boolean batchMode) {
+ super(type);
+ this.projectName = projectName;
+ this.id = id;
+ this.batchMode = batchMode;
+ }
+
+ public String getId() {
+ return Strings.nullToEmpty(projectName) + "~" + id;
+ }
+
+ public boolean isBatch() {
+ return batchMode;
+ }
+
+ public static class Update extends IndexChange {
+ static final String TYPE = "update-change";
+
+ public Update(String projectName, int id) {
+ this(projectName, id, false);
+ }
+
+ public Update(String projectName, int id, boolean batchMode) {
+ super(TYPE, projectName, id, batchMode);
+ }
+ }
+
+ public static class Delete extends IndexChange {
+ static final String TYPE = "delete-change";
+
+ public Delete(int id) {
+ this("", id);
+ }
+
+ public Delete(String projectName, int id) {
+ super(TYPE, projectName, id, false);
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java
new file mode 100644
index 0000000..b5c9f78
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+public class IndexGroup extends Command {
+ static final String TYPE = "index-group";
+
+ private final String uuid;
+
+ protected IndexGroup(String uuid) {
+ super(TYPE);
+ this.uuid = uuid;
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java
new file mode 100644
index 0000000..4d8214d
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+public class IndexProject extends Command {
+ static final String TYPE = "index-project";
+
+ private String projectName;
+
+ protected IndexProject(String projectName) {
+ super(TYPE);
+ this.projectName = projectName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java
new file mode 100644
index 0000000..40a358e
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java
@@ -0,0 +1,159 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.Event;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import dev.failsafe.FailsafeExecutor;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.jgroups.Address;
+import org.jgroups.ObjectMessage;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+
+@Singleton
+public class JGroupsForwarder implements Forwarder {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+ private final MessageDispatcher dispatcher;
+ private final JGroups jgroupsConfig;
+ private final Gson gson;
+ private final FailsafeExecutor<Boolean> executor;
+
+ @Inject
+ JGroupsForwarder(
+ MessageDispatcher dispatcher,
+ Configuration cfg,
+ @JGroupsGson Gson gson,
+ @JGroupsForwarderExecutor FailsafeExecutor<Boolean> executor) {
+ this.dispatcher = dispatcher;
+ this.jgroupsConfig = cfg.jgroups();
+ this.gson = gson;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent) {
+ return execute(new IndexAccount(accountId));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> indexChange(
+ String projectName, int changeId, IndexEvent indexEvent) {
+ return execute(new IndexChange.Update(projectName, changeId));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> batchIndexChange(
+ String projectName, int changeId, IndexEvent indexEvent) {
+ return execute(new IndexChange.Update(projectName, changeId, true));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent) {
+ return execute(new IndexChange.Delete(changeId));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent) {
+ return execute(new IndexGroup(uuid));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent) {
+ return execute(new IndexProject(projectName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> send(Event event) {
+ return execute(new PostEvent(event));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> evict(String cacheName, Object key) {
+ return execute(new EvictCache(cacheName, gson.toJson(key)));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> addToProjectList(String projectName) {
+ return execute(new AddToProjectList(projectName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeFromProjectList(String projectName) {
+ return execute(new RemoveFromProjectList(projectName));
+ }
+
+ private CompletableFuture<Boolean> execute(Command cmd) {
+ return executor.getAsync(() -> executeOnce(cmd));
+ }
+
+ private boolean executeOnce(Command cmd) {
+ String json = gson.toJson(cmd);
+ try {
+ logJGroupsInfo();
+
+ if (dispatcher.getChannel().getView().size() < 2) {
+ log.atFine().log("Less than two members in cluster, not sending %s", json);
+ return false;
+ }
+
+ log.atFine().log("Sending %s", json);
+ RequestOptions options =
+ new RequestOptions(ResponseMode.GET_FIRST, jgroupsConfig.timeout().toMillis());
+ RspList<Object> list = dispatcher.castMessage(null, new ObjectMessage(null, json), options);
+
+ log.atFine().log("Received response list length = %s", list.size());
+ if (list.isEmpty()) {
+ return false;
+ }
+
+ for (Entry<Address, Rsp<Object>> e : list.entrySet()) {
+ log.atFine().log("Response object %s", e);
+ if (!Boolean.TRUE.equals(e.getValue().getValue())) {
+ log.atWarning().log(
+ "Received a non TRUE response from receiver %s: %s",
+ e.getKey(), e.getValue().getValue());
+ return false;
+ }
+ }
+ log.atFine().log("Successfully sent message %s", json);
+ return true;
+ } catch (Exception e) {
+ log.atWarning().withCause(e).log("Forwarding %s failed", json);
+ return false;
+ }
+ }
+
+ private void logJGroupsInfo() {
+ log.atFine().log("My address: %s", dispatcher.getChannel().getAddress());
+ List<Address> members = dispatcher.getChannel().getView().getMembers();
+ for (Address m : members) {
+ log.atFine().log("Member: %s", m);
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderExecutor.java
similarity index 80%
copy from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
copy to src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderExecutor.java
index 9b33ca3..6d2bedf 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
-@Retention(RUNTIME)
@BindingAnnotation
-@interface CacheExecutor {}
+@Retention(RUNTIME)
+public @interface JGroupsForwarderExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java
new file mode 100644
index 0000000..58e7aeb
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java
@@ -0,0 +1,55 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
+import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.JChannelProviderModule;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gson.Gson;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.TypeLiteral;
+import dev.failsafe.FailsafeExecutor;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+
+public class JGroupsForwarderModule extends LifecycleModule {
+
+ @Override
+ protected void configure() {
+ bind(Forwarder.class).to(JGroupsForwarder.class);
+ bind(MessageDispatcher.class).toProvider(MessageDispatcherProvider.class).in(Scopes.SINGLETON);
+ bind(RequestHandler.class).to(MessageProcessor.class);
+ install(new JChannelProviderModule());
+ listener().to(OnStartStop.class);
+
+ bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {})
+ .annotatedWith(JGroupsForwarderExecutor.class)
+ .toProvider(FailsafeExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+ }
+
+ @Provides
+ @Singleton
+ @JGroupsGson
+ Gson buildJGroupsGson(@EventGson Gson eventGson) {
+ return eventGson
+ .newBuilder()
+ .registerTypeAdapter(Command.class, new CommandDeserializer())
+ .create();
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsGson.java
similarity index 81%
rename from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
rename to src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsGson.java
index 9b33ca3..a4ca15a 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsGson.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
-@Retention(RUNTIME)
@BindingAnnotation
-@interface CacheExecutor {}
+@Retention(RUNTIME)
+public @interface JGroupsGson {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageDispatcherProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageDispatcherProvider.java
new file mode 100644
index 0000000..495ea8f
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageDispatcherProvider.java
@@ -0,0 +1,68 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups;
+import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.InetAddressFinder;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.net.InetAddress;
+import java.util.Optional;
+import org.jgroups.JChannel;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.blocks.RequestHandler;
+
+@Singleton
+public class MessageDispatcherProvider implements Provider<MessageDispatcher> {
+ private static FluentLogger log = FluentLogger.forEnclosingClass();
+
+ private final InetAddressFinder finder;
+ private final JGroups jgroupsConfig;
+ private final RequestHandler requestHandler;
+ private final JChannel channel;
+
+ @Inject
+ MessageDispatcherProvider(
+ InetAddressFinder finder,
+ Configuration pluginConfiguration,
+ RequestHandler requestHandler,
+ JChannel channel) {
+ this.finder = finder;
+ this.jgroupsConfig = pluginConfiguration.jgroups();
+ this.requestHandler = requestHandler;
+ this.channel = channel;
+ }
+
+ @Override
+ public MessageDispatcher get() {
+ try {
+ Optional<InetAddress> address = finder.findAddress();
+ if (address.isPresent()) {
+ channel.getProtocolStack().getTransport().setBindAddress(address.get());
+ }
+ channel.setDiscardOwnMessages(true);
+ channel.connect(jgroupsConfig.clusterName());
+ log.atInfo().log("Succesfully joined jgroups cluster %s", channel.getClusterName());
+ MessageDispatcher dispatcher = new MessageDispatcher(channel, requestHandler);
+ return dispatcher;
+ } catch (Exception e) {
+ throw new ProvisionException("Could not create a JChannel", e);
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java
new file mode 100644
index 0000000..54096ac
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java
@@ -0,0 +1,156 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheEntry;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedCacheEvictionHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedEventHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexAccountHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexBatchChangeHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Account;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import java.util.Optional;
+import org.jgroups.Message;
+import org.jgroups.blocks.RequestHandler;
+
+@Singleton
+public class MessageProcessor implements RequestHandler {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+ private final Gson gson;
+ private final ForwardedIndexChangeHandler indexChangeHandler;
+ private final ForwardedIndexBatchChangeHandler indexBatchChangeHandler;
+ private final ForwardedIndexAccountHandler indexAccountHandler;
+ private final ForwardedCacheEvictionHandler cacheEvictionHandler;
+ private final ForwardedEventHandler eventHandler;
+ private final ForwardedProjectListUpdateHandler projectListUpdateHandler;
+
+ @Inject
+ MessageProcessor(
+ @JGroupsGson Gson gson,
+ ForwardedIndexChangeHandler indexChangeHandler,
+ ForwardedIndexBatchChangeHandler indexBatchChangeHandler,
+ ForwardedIndexAccountHandler indexAccountHandler,
+ ForwardedCacheEvictionHandler cacheEvictionHandler,
+ ForwardedEventHandler eventHandler,
+ ForwardedProjectListUpdateHandler projectListUpdateHandler) {
+ this.gson = gson;
+ this.indexChangeHandler = indexChangeHandler;
+ this.indexBatchChangeHandler = indexBatchChangeHandler;
+ this.indexAccountHandler = indexAccountHandler;
+ this.cacheEvictionHandler = cacheEvictionHandler;
+ this.eventHandler = eventHandler;
+ this.projectListUpdateHandler = projectListUpdateHandler;
+ }
+
+ @Override
+ public Object handle(Message msg) {
+ Command cmd = getCommand(msg);
+
+ Context.setForwardedEvent(true);
+ try {
+
+ if (cmd instanceof IndexChange) {
+ IndexChange indexChange = (IndexChange) cmd;
+ Operation op = getOperation(indexChange);
+ try {
+ ForwardedIndexChangeHandler handler =
+ indexChange.isBatch() ? indexBatchChangeHandler : indexChangeHandler;
+ handler.index(indexChange.getId(), op, Optional.empty());
+ log.atFine().log(
+ "Change index %s on change %s done", op.name().toLowerCase(), indexChange.getId());
+ } catch (Exception e) {
+ log.atSevere().withCause(e).log(
+ "Change index %s on change %s failed", op.name().toLowerCase(), indexChange.getId());
+ return false;
+ }
+
+ } else if (cmd instanceof IndexAccount) {
+ IndexAccount indexAccount = (IndexAccount) cmd;
+ try {
+ indexAccountHandler.index(
+ Account.id(indexAccount.getId()), Operation.INDEX, Optional.empty());
+ log.atFine().log("Account index update on account %s done", indexAccount.getId());
+ } catch (IOException e) {
+ log.atSevere().withCause(e).log(
+ "Account index update on account %s failed", indexAccount.getId());
+ return false;
+ }
+
+ } else if (cmd instanceof EvictCache) {
+ EvictCache evictCommand = (EvictCache) cmd;
+ cacheEvictionHandler.evict(
+ CacheEntry.from(evictCommand.getCacheName(), evictCommand.getKeyJson()));
+ log.atFine().log(
+ "Cache eviction %s %s done", evictCommand.getCacheName(), evictCommand.getKeyJson());
+
+ } else if (cmd instanceof PostEvent) {
+ Event event = ((PostEvent) cmd).getEvent();
+ try {
+ eventHandler.dispatch(event);
+ log.atFine().log("Dispatching event %s done", event);
+ } catch (PermissionBackendException e) {
+ log.atSevere().withCause(e).log("Dispatching event %s failed", event);
+ return false;
+ }
+
+ } else if (cmd instanceof AddToProjectList) {
+ String projectName = ((AddToProjectList) cmd).getProjectName();
+ projectListUpdateHandler.update(projectName, false);
+
+ } else if (cmd instanceof RemoveFromProjectList) {
+ String projectName = ((RemoveFromProjectList) cmd).getProjectName();
+ projectListUpdateHandler.update(projectName, true);
+ }
+
+ return true;
+ } catch (Exception e) {
+ return false;
+ } finally {
+ Context.unsetForwardedEvent();
+ }
+ }
+
+ private Operation getOperation(IndexChange cmd) {
+ if (cmd instanceof IndexChange.Update) {
+ return Operation.INDEX;
+ } else if (cmd instanceof IndexChange.Delete) {
+ return Operation.DELETE;
+ } else {
+ throw new IllegalArgumentException("Unknown type of IndexChange command " + cmd.getClass());
+ }
+ }
+
+ private Command getCommand(Message msg) {
+ try {
+ String s = (String) msg.getObject();
+ log.atFine().log("Received message: %s", s);
+ return gson.fromJson(s, Command.class);
+ } catch (RuntimeException e) {
+ log.atSevere().withCause(e).log("Error parsing message %s", msg.getObject());
+ throw e;
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/OnStartStop.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/OnStartStop.java
new file mode 100644
index 0000000..fad84f2
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/OnStartStop.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import org.jgroups.blocks.MessageDispatcher;
+
+@Singleton
+public class OnStartStop implements LifecycleListener {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+ private final MessageDispatcher dispatcher;
+
+ @Inject
+ public OnStartStop(MessageDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {
+ log.atInfo().log("Closing JChannel");
+ dispatcher.getChannel().close();
+ try {
+ dispatcher.close();
+ } catch (IOException e) {
+ log.atSevere().withCause(e).log("Could not close the MessageDispatcher");
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java
new file mode 100644
index 0000000..49627a7
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java
@@ -0,0 +1,32 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import com.google.gerrit.server.events.Event;
+
+public class PostEvent extends Command {
+ static final String TYPE = "post-event";
+
+ private final Event event;
+
+ protected PostEvent(Event event) {
+ super(TYPE);
+ this.event = event;
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java
new file mode 100644
index 0000000..8eb0a3d
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+public class RemoveFromProjectList extends Command {
+ static final String TYPE = "remove-from-project-list";
+
+ private final String projectName;
+
+ public RemoveFromProjectList(String projectName) {
+ super(TYPE);
+ this.projectName = projectName;
+ }
+
+ public String getProjectName() {
+ return projectName;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java
index 48f3bff..ba28650 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java
@@ -28,7 +28,7 @@
import com.google.inject.Singleton;
@Singleton
-class CacheKeyJsonParser {
+public class CacheKeyJsonParser {
private final Gson gson;
@Inject
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java
new file mode 100644
index 0000000..8134220
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java
@@ -0,0 +1,61 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeExecutor;
+import dev.failsafe.Fallback;
+import dev.failsafe.RetryPolicy;
+import java.util.concurrent.Executors;
+
+@Singleton
+public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Boolean>> {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private final Configuration cfg;
+
+ @Inject
+ FailsafeExecutorProvider(Configuration cfg) {
+ this.cfg = cfg;
+ }
+
+ @Override
+ public FailsafeExecutor<Boolean> get() {
+ Fallback<Boolean> fallbackToFalse = Fallback.<Boolean>of(() -> false);
+ RetryPolicy<Boolean> retryPolicy =
+ RetryPolicy.<Boolean>builder()
+ .withMaxAttempts(cfg.http().maxTries())
+ .withDelay(cfg.http().retryInterval())
+ .onRetry(e -> log.atFine().log("Retrying event %s", e))
+ .onRetriesExceeded(
+ e ->
+ log.atWarning().log(
+ "%d http retries exceeded for event %s", cfg.http().maxTries(), e))
+ .handleResult(false)
+ .abortIf(
+ (r, e) ->
+ e instanceof ForwardingException && !((ForwardingException) e).isRecoverable())
+ .build();
+ // TODO: the executor shall be created by workQueue.createQueue(...)
+ // However, this currently doesn't work because WorkQueue.Executor doesn't support wrapping of
+ // Callable i.e. it throws an exception on decorateTask(Callable)
+ return Failsafe.with(fallbackToFalse, retryPolicy)
+ .with(Executors.newScheduledThreadPool(cfg.http().threadPoolSize()));
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
index 0510aed..a71e03f 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProvider.java
@@ -68,10 +68,11 @@
}
private RequestConfig customRequestConfig() {
+ int connectionTimeout = (int) cfg.http().connectionTimeout().toMillis();
return RequestConfig.custom()
- .setConnectTimeout(cfg.http().connectionTimeout())
- .setSocketTimeout(cfg.http().socketTimeout())
- .setConnectionRequestTimeout(cfg.http().connectionTimeout())
+ .setConnectTimeout(connectionTimeout)
+ .setSocketTimeout((int) cfg.http().socketTimeout().toMillis())
+ .setConnectionRequestTimeout(connectionTimeout)
.build();
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index c0eb6ae..922fbd6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -29,6 +29,7 @@
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Provider;
+import dev.failsafe.FailsafeExecutor;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -49,7 +50,7 @@
private final Configuration cfg;
private final Provider<Set<PeerInfo>> peerInfoProvider;
private final Gson gson;
- private final RestForwarderScheduler scheduler;
+ private FailsafeExecutor<Boolean> executor;
@Inject
RestForwarder(
@@ -58,13 +59,13 @@
Configuration cfg,
Provider<Set<PeerInfo>> peerInfoProvider,
@EventGson Gson gson,
- RestForwarderScheduler scheduler) {
+ @RestForwarderExecutor FailsafeExecutor<Boolean> executor) {
this.httpSession = httpClient;
this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName);
this.cfg = cfg;
this.peerInfoProvider = peerInfoProvider;
this.gson = gson;
- this.scheduler = scheduler;
+ this.executor = executor;
}
@Override
@@ -162,7 +163,7 @@
RequestMethod method, String action, String endpoint, Object id, Object payload) {
return peerInfoProvider.get().stream()
.map(peer -> createRequest(method, peer, action, endpoint, id, payload))
- .map(scheduler::execute)
+ .map(r -> executor.getAsync(() -> r.execute()))
.reduce(
CompletableFuture.completedFuture(true),
(a, b) -> a.thenCombine(b, (left, right) -> left && right));
@@ -226,12 +227,6 @@
action, key, destination);
throw e;
}
- if (execCnt >= maxTries) {
- log.atSevere().log(
- "Failed to %s %s on %s after %d tries; giving up",
- action, key, destination, maxTries);
- throw e;
- }
}
return false;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderExecutor.java
similarity index 81%
copy from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
copy to src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderExecutor.java
index 9b33ca3..c8dd34f 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
-@Retention(RUNTIME)
@BindingAnnotation
-@interface CacheExecutor {}
+@Retention(RUNTIME)
+public @interface RestForwarderExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java
index e535d0f..39c9cc0 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java
@@ -17,6 +17,8 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import dev.failsafe.FailsafeExecutor;
import org.apache.http.impl.client.CloseableHttpClient;
public class RestForwarderModule extends AbstractModule {
@@ -26,5 +28,10 @@
bind(CloseableHttpClient.class).toProvider(HttpClientProvider.class).in(Scopes.SINGLETON);
bind(HttpSession.class);
bind(Forwarder.class).to(RestForwarder.class);
+
+ bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {})
+ .annotatedWith(RestForwarderExecutor.class)
+ .toProvider(FailsafeExecutorProvider.class)
+ .in(Scopes.SINGLETON);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
deleted file mode 100644
index ebbd940..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-@Singleton
-public class RestForwarderScheduler {
- private static final FluentLogger log = FluentLogger.forEnclosingClass();
- private final ScheduledExecutorService executor;
- private final long retryIntervalMs;
-
- public class CompletablePromise<V> extends CompletableFuture<V> {
- private Future<V> future;
-
- public CompletablePromise(Future<V> future) {
- this.future = future;
- executor.execute(this::tryToComplete);
- }
-
- private void tryToComplete() {
- if (future.isDone()) {
- try {
- complete(future.get());
- } catch (InterruptedException e) {
- completeExceptionally(e);
- } catch (ExecutionException e) {
- completeExceptionally(e.getCause());
- }
- return;
- }
-
- if (future.isCancelled()) {
- cancel(true);
- return;
- }
-
- executor.execute(this::tryToComplete);
- }
- }
-
- @Inject
- public RestForwarderScheduler(
- WorkQueue workQueue, Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider) {
- int executorSize = peerInfoProvider.get().size() * cfg.index().threadPoolSize();
- retryIntervalMs = cfg.index().retryInterval();
- this.executor = workQueue.createQueue(executorSize, "RestForwarderScheduler");
- }
-
- @VisibleForTesting
- public RestForwarderScheduler(ScheduledExecutorService executor) {
- this.executor = executor;
- retryIntervalMs = 0;
- }
-
- public CompletableFuture<Boolean> execute(RestForwarder.Request request) {
- return execute(request, 0);
- }
-
- public CompletableFuture<Boolean> execute(RestForwarder.Request request, long delayMs) {
- return supplyAsync(
- request.toString(),
- () -> {
- try {
- if (!request.execute()) {
- log.atWarning().log(
- "Rescheduling %s for retry after %d msec", request, retryIntervalMs);
- return execute(request, retryIntervalMs);
- }
- return CompletableFuture.completedFuture(true);
- } catch (ForwardingException e) {
- log.atSevere().withCause(e).log("Forwarding of %s has failed", request);
- return CompletableFuture.completedFuture(false);
- }
- },
- executor,
- delayMs);
- }
-
- private CompletableFuture<Boolean> supplyAsync(
- String taskName,
- Supplier<CompletableFuture<Boolean>> fn,
- ScheduledExecutorService executor,
- long delayMs) {
- BooleanAsyncSupplier asyncSupplier = new BooleanAsyncSupplier(taskName, fn);
- executor.schedule(asyncSupplier, delayMs, TimeUnit.MILLISECONDS);
- return asyncSupplier.future();
- }
-
- static class BooleanAsyncSupplier implements Runnable {
- private CompletableFuture<CompletableFuture<Boolean>> dep;
- private Supplier<CompletableFuture<Boolean>> fn;
- private String taskName;
-
- BooleanAsyncSupplier(String taskName, Supplier<CompletableFuture<Boolean>> fn) {
- this.taskName = taskName;
- this.dep = new CompletableFuture<>();
- this.fn = fn;
- }
-
- public CompletableFuture<Boolean> future() {
- return dep.thenCompose(Function.identity());
- }
-
- @Override
- public void run() {
- try {
- dep.complete(fn.get());
- } catch (Throwable ex) {
- dep.completeExceptionally(ex);
- }
- }
-
- @Override
- public String toString() {
- return taskName;
- }
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java
deleted file mode 100644
index 06da9f0..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2021 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.index;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface BatchIndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
index 252b7e8..af730b7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
@@ -19,7 +19,7 @@
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.HumanComment;
import com.google.gerrit.entities.RefNames;
-import com.google.gerrit.server.CommentsUtil;
+import com.google.gerrit.server.DraftCommentsReader;
import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.notedb.ChangeNotes;
@@ -38,7 +38,7 @@
public class ChangeCheckerImpl implements ChangeChecker {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private final GitRepositoryManager gitRepoMgr;
- private final CommentsUtil commentsUtil;
+ private final DraftCommentsReader draftCommentsReader;
private final OneOffRequestContext oneOffReqCtx;
private final String changeId;
private final ChangeFinder changeFinder;
@@ -52,13 +52,13 @@
@Inject
public ChangeCheckerImpl(
GitRepositoryManager gitRepoMgr,
- CommentsUtil commentsUtil,
+ DraftCommentsReader draftCommentsReader,
ChangeFinder changeFinder,
OneOffRequestContext oneOffReqCtx,
@Assisted String changeId) {
this.changeFinder = changeFinder;
this.gitRepoMgr = gitRepoMgr;
- this.commentsUtil = commentsUtil;
+ this.draftCommentsReader = draftCommentsReader;
this.oneOffReqCtx = oneOffReqCtx;
this.changeId = changeId;
}
@@ -189,7 +189,8 @@
private long getTsFromChangeAndDraftComments(ChangeNotes notes) {
Change change = notes.getChange();
Timestamp changeTs = Timestamp.from(change.getLastUpdatedOn());
- for (HumanComment comment : commentsUtil.draftByChange(changeNotes.get())) {
+ for (HumanComment comment :
+ draftCommentsReader.getDraftsByChangeForAllAuthors(changeNotes.get())) {
Timestamp commentTs = comment.writtenOn;
changeTs = commentTs.after(changeTs) ? commentTs : changeTs;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
index f5bc85e..a3a7e64 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2021 The Android Open Source Project
+// Copyright (C) 2024 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.common.UsedAt;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -23,8 +24,13 @@
@Singleton
class ForwardedBatchIndexExecutorProvider extends ExecutorProvider {
+ @UsedAt(UsedAt.Project.PLUGIN_MULTI_SITE)
+ public static final String FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX =
+ "Forwarded-BatchIndex-Event";
+
@Inject
ForwardedBatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.index().batchThreadPoolSize(), "Forwarded-BatchIndex-Event");
+ super(
+ workQueue, config.index().batchThreadPoolSize(), FORWARDED_BATCH_INDEX_EVENT_THREAD_PREFIX);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexFailsafeExecutorProvider.java
similarity index 76%
rename from src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java
rename to src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexFailsafeExecutorProvider.java
index cfbd4fb..f52b3fe 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexFailsafeExecutorProvider.java
@@ -16,15 +16,15 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
-import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
-class BatchIndexExecutorProvider extends ExecutorProvider {
+class ForwardedBatchIndexFailsafeExecutorProvider extends ForwardedIndexFailsafeExecutorProvider {
@Inject
- BatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.index().batchThreadPoolSize(), "Forward-BatchIndex-Event");
+ ForwardedBatchIndexFailsafeExecutorProvider(
+ Configuration cfg, @ForwardedBatchIndexExecutor ExecutorProvider indexExecutorProvider) {
+ super(cfg, indexExecutorProvider);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
index 2112dbe..da623df 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
@@ -16,15 +16,19 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.common.UsedAt;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
-class ForwardedIndexExecutorProvider extends ExecutorProvider {
+public class ForwardedIndexExecutorProvider extends ExecutorProvider {
+
+ @UsedAt(UsedAt.Project.PLUGIN_MULTI_SITE)
+ public static final String FORWARDED_INDEX_EVENT_THREAD_PREFIX = "Forwarded-Index-Event";
@Inject
ForwardedIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.index().threadPoolSize(), "Forwarded-Index-Event");
+ super(workQueue, config.index().threadPoolSize(), FORWARDED_INDEX_EVENT_THREAD_PREFIX);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexFailsafeExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexFailsafeExecutorProvider.java
new file mode 100644
index 0000000..7bae94a
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexFailsafeExecutorProvider.java
@@ -0,0 +1,57 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeExecutor;
+import dev.failsafe.RetryPolicy;
+import java.io.IOException;
+
+@Singleton
+public class ForwardedIndexFailsafeExecutorProvider implements Provider<FailsafeExecutor<Boolean>> {
+ protected static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private final Configuration cfg;
+ private final ExecutorProvider indexExecutorProvider;
+
+ @Inject
+ public ForwardedIndexFailsafeExecutorProvider(
+ Configuration cfg, @ForwardedIndexExecutor ExecutorProvider indexExecutorProvider) {
+ this.cfg = cfg;
+ this.indexExecutorProvider = indexExecutorProvider;
+ }
+
+ @Override
+ public FailsafeExecutor<Boolean> get() {
+ RetryPolicy<Boolean> retryPolicy =
+ RetryPolicy.<Boolean>builder()
+ .withMaxAttempts(cfg.index().maxTries())
+ .withDelay(cfg.index().retryInterval())
+ .onRetry(e -> log.atFine().log("Retrying event %s", e))
+ .onRetriesExceeded(
+ e ->
+ log.atWarning().log(
+ "%d index retries exceeded for event %s", cfg.index().maxTries(), e))
+ .handleResult(false)
+ .abortOn(IOException.class)
+ .build();
+ return Failsafe.with(retryPolicy).with(indexExecutorProvider.get());
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 4f77508..ec2a091 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -14,24 +14,16 @@
package com.ericsson.gerrit.plugins.highavailability.index;
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
-import com.google.common.base.Objects;
import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.events.AccountIndexedListener;
import com.google.gerrit.extensions.events.ChangeIndexedListener;
import com.google.gerrit.extensions.events.GroupIndexedListener;
import com.google.gerrit.extensions.events.ProjectIndexedListener;
import com.google.inject.Inject;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Optional;
class IndexEventHandler
implements ChangeIndexedListener,
@@ -39,37 +31,16 @@
GroupIndexedListener,
ProjectIndexedListener {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
- private final ScheduledExecutorService executor;
- private final ScheduledExecutorService batchExecutor;
private final Forwarder forwarder;
- private final String pluginName;
- private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final ChangeCheckerImpl.Factory changeChecker;
private final CurrentRequestContext currCtx;
- private final IndexEventLocks locks;
-
- private final int retryInterval;
- private final int maxTries;
@Inject
IndexEventHandler(
- @IndexExecutor ScheduledExecutorService executor,
- @BatchIndexExecutor ScheduledExecutorService batchExecutor,
- @PluginName String pluginName,
- Forwarder forwarder,
- ChangeCheckerImpl.Factory changeChecker,
- CurrentRequestContext currCtx,
- Configuration cfg,
- IndexEventLocks locks) {
+ Forwarder forwarder, ChangeCheckerImpl.Factory changeChecker, CurrentRequestContext currCtx) {
this.forwarder = forwarder;
- this.executor = executor;
- this.batchExecutor = batchExecutor;
- this.pluginName = pluginName;
this.changeChecker = changeChecker;
this.currCtx = currCtx;
- this.locks = locks;
- this.retryInterval = cfg.http().retryInterval();
- this.maxTries = cfg.http().maxTries();
}
@Override
@@ -77,10 +48,7 @@
currCtx.onlyWithContext(
(ctx) -> {
if (!Context.isForwardedEvent()) {
- IndexAccountTask task = new IndexAccountTask(id);
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
+ forwarder.indexAccount(id, new IndexEvent());
}
});
}
@@ -91,30 +59,19 @@
}
private void executeIndexChangeTask(String projectName, int id) {
-
if (!Context.isForwardedEvent()) {
String changeId = projectName + "~" + id;
try {
- changeChecker
- .create(changeId)
- .newIndexEvent()
- .map(
- event -> {
- if (Thread.currentThread().getName().contains("Batch")) {
- return new BatchIndexChangeTask(projectName, id, event);
- }
- return new IndexChangeTask(projectName, id, event);
- })
- .ifPresent(
- task -> {
- if (queuedTasks.add(task)) {
- if (task instanceof BatchIndexChangeTask) {
- batchExecutor.execute(task);
- } else {
- executor.execute(task);
- }
- }
- });
+ Optional<IndexEvent> indexEvent = changeChecker.create(changeId).newIndexEvent();
+ if (indexEvent.isEmpty()) {
+ return;
+ }
+
+ if (Thread.currentThread().getName().contains("Batch")) {
+ forwarder.batchIndexChange(projectName, id, indexEvent.get());
+ } else {
+ forwarder.indexChange(projectName, id, indexEvent.get());
+ }
} catch (Exception e) {
log.atWarning().withCause(e).log("Unable to create task to reindex change %s", changeId);
}
@@ -124,296 +81,21 @@
@Override
public void onChangeDeleted(int id) {
if (!Context.isForwardedEvent()) {
- DeleteChangeTask task = new DeleteChangeTask(id, new IndexEvent());
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
+ forwarder.deleteChangeFromIndex(id, new IndexEvent());
}
}
@Override
public void onProjectIndexed(String projectName) {
if (!Context.isForwardedEvent()) {
- IndexProjectTask task = new IndexProjectTask(projectName);
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
+ forwarder.indexProject(projectName, new IndexEvent());
}
}
@Override
public void onGroupIndexed(String groupUUID) {
if (!Context.isForwardedEvent()) {
- IndexGroupTask task = new IndexGroupTask(groupUUID);
- if (queuedTasks.add(task)) {
- executor.execute(task);
- }
- }
- }
-
- abstract class IndexTask implements Runnable {
- protected final IndexEvent indexEvent;
- private int retryCount = 0;
-
- IndexTask() {
- indexEvent = new IndexEvent();
- }
-
- IndexTask(IndexEvent indexEvent) {
- this.indexEvent = indexEvent;
- }
-
- @Override
- public void run() {
- locks.withLock(
- this,
- () -> {
- queuedTasks.remove(this);
- return execute();
- },
- this::reschedule);
- }
-
- private void reschedule() {
- if (++retryCount <= maxTries) {
- log.atFine().log("Retrying %d times to %s", retryCount, this);
- executor.schedule(this, retryInterval, TimeUnit.MILLISECONDS);
- } else {
- log.atSevere().log("Failed to %s after %d tries; giving up", this, maxTries);
- }
- }
-
- abstract CompletableFuture<Boolean> execute();
-
- abstract String indexId();
- }
-
- class IndexChangeTask extends IndexTask {
- private final int changeId;
- private final String projectName;
-
- IndexChangeTask(String projectName, int changeId, IndexEvent indexEvent) {
- super(indexEvent);
- this.projectName = projectName;
- this.changeId = changeId;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.indexChange(projectName, changeId, indexEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(IndexChangeTask.class, changeId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IndexChangeTask)) {
- return false;
- }
- IndexChangeTask other = (IndexChangeTask) obj;
- return changeId == other.changeId;
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Index change %s in target instance", pluginName, changeId);
- }
-
- @Override
- String indexId() {
- return "change/" + changeId;
- }
- }
-
- class BatchIndexChangeTask extends IndexTask {
- private final int changeId;
- private final String projectName;
-
- BatchIndexChangeTask(String projectName, int changeId, IndexEvent indexEvent) {
- super(indexEvent);
- this.projectName = projectName;
- this.changeId = changeId;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.batchIndexChange(projectName, changeId, indexEvent);
- }
-
- @Override
- String indexId() {
- return "change/" + changeId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(IndexEventHandler.BatchIndexChangeTask.class, changeId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IndexEventHandler.BatchIndexChangeTask)) {
- return false;
- }
- IndexEventHandler.BatchIndexChangeTask other = (IndexEventHandler.BatchIndexChangeTask) obj;
- return changeId == other.changeId;
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Index change %s in target instance", pluginName, changeId);
- }
- }
-
- class DeleteChangeTask extends IndexTask {
- private final int changeId;
-
- DeleteChangeTask(int changeId, IndexEvent indexEvent) {
- super(indexEvent);
- this.changeId = changeId;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.deleteChangeFromIndex(changeId, indexEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(DeleteChangeTask.class, changeId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof DeleteChangeTask)) {
- return false;
- }
- DeleteChangeTask other = (DeleteChangeTask) obj;
- return changeId == other.changeId;
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Delete change %s in target instance", pluginName, changeId);
- }
-
- @Override
- String indexId() {
- return "change/" + changeId;
- }
- }
-
- class IndexAccountTask extends IndexTask {
- private final int accountId;
-
- IndexAccountTask(int accountId) {
- this.accountId = accountId;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.indexAccount(accountId, indexEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(accountId);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IndexAccountTask)) {
- return false;
- }
- IndexAccountTask other = (IndexAccountTask) obj;
- return accountId == other.accountId;
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Index account %s in target instance", pluginName, accountId);
- }
-
- @Override
- String indexId() {
- return "account/" + accountId;
- }
- }
-
- class IndexGroupTask extends IndexTask {
- private final String groupUUID;
-
- IndexGroupTask(String groupUUID) {
- this.groupUUID = groupUUID;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.indexGroup(groupUUID, indexEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(IndexGroupTask.class, groupUUID);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IndexGroupTask)) {
- return false;
- }
- IndexGroupTask other = (IndexGroupTask) obj;
- return groupUUID.equals(other.groupUUID);
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Index group %s in target instance", pluginName, groupUUID);
- }
-
- @Override
- String indexId() {
- return "group/" + groupUUID;
- }
- }
-
- class IndexProjectTask extends IndexTask {
- private final String projectName;
-
- IndexProjectTask(String projectName) {
- this.projectName = projectName;
- }
-
- @Override
- public CompletableFuture<Boolean> execute() {
- return forwarder.indexProject(projectName, indexEvent);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(IndexProjectTask.class, projectName);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof IndexProjectTask)) {
- return false;
- }
- IndexProjectTask other = (IndexProjectTask) obj;
- return projectName.equals(other.projectName);
- }
-
- @Override
- public String toString() {
- return String.format("[%s] Index project %s in target instance", pluginName, projectName);
- }
-
- @Override
- String indexId() {
- return "project/" + projectName;
+ forwarder.indexGroup(groupUUID, new IndexEvent());
}
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
deleted file mode 100644
index bbff4ee..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.index;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexTask;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Striped;
-import com.google.inject.Inject;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-public class IndexEventLocks {
- private static final FluentLogger log = FluentLogger.forEnclosingClass();
-
- private static final int NUMBER_OF_INDEX_TASK_TYPES = 4;
- private static final int WAIT_TIMEOUT_MS = 5;
-
- private final Striped<Semaphore> semaphores;
-
- @Inject
- public IndexEventLocks(Configuration cfg) {
- this.semaphores =
- Striped.semaphore(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks(), 1);
- }
-
- public CompletableFuture<?> withLock(
- IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
- String indexId = id.indexId();
- Semaphore idSemaphore = getSemaphore(indexId);
- try {
- log.atFine().log("Trying to acquire %s", id);
- if (idSemaphore.tryAcquire(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
- log.atFine().log("Acquired %s", id);
- return function
- .invoke()
- .whenComplete(
- (result, error) -> {
- try {
- log.atFine().log("Trying to release %s", id);
- idSemaphore.release();
- log.atFine().log("Released %s", id);
- } catch (Throwable t) {
- log.atSevere().withCause(t).log("Unable to release %s", id);
- throw t;
- }
- });
- }
-
- String timeoutMessage =
- String.format(
- "Acquisition of the locking of %s timed out after %d msec: consider increasing the number of shards",
- indexId, WAIT_TIMEOUT_MS);
- log.atWarning().log("%s", timeoutMessage);
- lockAcquireTimeoutCallback.invoke();
- CompletableFuture<?> failureFuture = new CompletableFuture<>();
- failureFuture.completeExceptionally(new InterruptedException(timeoutMessage));
- return failureFuture;
- } catch (InterruptedException e) {
- CompletableFuture<?> failureFuture = new CompletableFuture<>();
- failureFuture.completeExceptionally(e);
- log.atSevere().withCause(e).log("Locking of %s was interrupted; giving up", indexId);
- return failureFuture;
- }
- }
-
- @VisibleForTesting
- protected Semaphore getSemaphore(String indexId) {
- return semaphores.get(indexId);
- }
-
- @FunctionalInterface
- public interface VoidFunction {
- void invoke();
- }
-
- @FunctionalInterface
- public interface IndexCallFunction {
- CompletableFuture<?> invoke();
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutor.java
deleted file mode 100644
index 289b588..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.index;
-
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface IndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProvider.java
deleted file mode 100644
index 1d02f2c..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProvider.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.index;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
-import com.google.gerrit.server.git.WorkQueue;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-@Singleton
-class IndexExecutorProvider extends ExecutorProvider {
-
- @Inject
- IndexExecutorProvider(WorkQueue workQueue, Configuration config) {
- super(workQueue, config.index().threadPoolSize(), "Forward-Index-Event");
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index 3bcc34f..1db3721 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -14,34 +14,40 @@
package com.ericsson.gerrit.plugins.highavailability.index;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
import com.google.gerrit.extensions.events.AccountIndexedListener;
import com.google.gerrit.extensions.events.ChangeIndexedListener;
import com.google.gerrit.extensions.events.GroupIndexedListener;
import com.google.gerrit.extensions.events.ProjectIndexedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
-import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import java.util.concurrent.ScheduledExecutorService;
+import dev.failsafe.FailsafeExecutor;
-public class IndexModule extends LifecycleModule {
+public class IndexModule extends AbstractModule {
@Override
protected void configure() {
- bind(ScheduledExecutorService.class)
- .annotatedWith(IndexExecutor.class)
- .toProvider(IndexExecutorProvider.class);
- bind(ScheduledExecutorService.class)
+ bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {})
.annotatedWith(ForwardedIndexExecutor.class)
- .toProvider(ForwardedIndexExecutorProvider.class);
- bind(IndexEventLocks.class).in(Scopes.SINGLETON);
- bind(ScheduledExecutorService.class)
- .annotatedWith(BatchIndexExecutor.class)
- .toProvider(BatchIndexExecutorProvider.class);
- bind(ScheduledExecutorService.class)
+ .toProvider(ForwardedIndexFailsafeExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+ bind(ExecutorProvider.class)
+ .annotatedWith(ForwardedIndexExecutor.class)
+ .to(ForwardedIndexExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+
+ bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {})
.annotatedWith(ForwardedBatchIndexExecutor.class)
- .toProvider(ForwardedBatchIndexExecutorProvider.class);
- listener().to(IndexExecutorProvider.class);
+ .toProvider(ForwardedBatchIndexFailsafeExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+ bind(ExecutorProvider.class)
+ .annotatedWith(ForwardedBatchIndexExecutor.class)
+ .to(ForwardedBatchIndexExecutorProvider.class)
+ .in(Scopes.SINGLETON);
+
DynamicSet.bind(binder(), ChangeIndexedListener.class)
.to(IndexEventHandler.class)
.in(Scopes.SINGLETON);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
index 68707c2..0a7442c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
@@ -15,6 +15,7 @@
package com.ericsson.gerrit.plugins.highavailability.peers;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.JChannelProviderModule;
import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.JGroupsPeerInfoProvider;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.TypeLiteral;
@@ -37,6 +38,7 @@
case JGROUPS:
bind(new TypeLiteral<Set<PeerInfo>>() {}).toProvider(JGroupsPeerInfoProvider.class);
listener().to(JGroupsPeerInfoProvider.class);
+ install(new JChannelProviderModule());
break;
default:
throw new IllegalArgumentException("Unsupported peer info strategy: " + strategy);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/InetAddressFinder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/InetAddressFinder.java
index 4b86646..77a81b3 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/InetAddressFinder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/InetAddressFinder.java
@@ -16,6 +16,7 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.net.Inet4Address;
@@ -29,7 +30,8 @@
import java.util.Optional;
@Singleton
-class InetAddressFinder {
+public class InetAddressFinder {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
private final boolean preferIPv4;
private final Configuration.JGroups jgroupsConfig;
@@ -48,7 +50,7 @@
*
* @return an Optional<InetAddress>
*/
- Optional<InetAddress> findAddress() throws SocketException {
+ public Optional<InetAddress> findAddress() throws SocketException {
return findFirstAppropriateAddress(Collections.list(NetworkInterface.getNetworkInterfaces()));
}
@@ -56,16 +58,28 @@
Optional<InetAddress> findFirstAppropriateAddress(List<NetworkInterface> networkInterfaces)
throws SocketException {
for (NetworkInterface ni : networkInterfaces) {
- if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast() || shouldSkip(ni.getName())) {
+ if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
+ log.atFine().log(
+ "ignoring network interface %s [isLoopback: %s, isUp: %s, supportsMulticast: %s]",
+ ni.getName(), ni.isLoopback(), ni.isUp(), ni.supportsMulticast());
+ continue;
+ }
+ if (shouldSkip(ni.getName())) {
continue;
}
Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress a = inetAddresses.nextElement();
if (preferIPv4 && a instanceof Inet4Address) {
+ log.atFine().log(
+ "using IPv4 network interface %s [hostAddress = %s, hostName: %s]",
+ ni.getName(), a.getHostAddress(), a.getHostName());
return Optional.of(a);
}
if (!preferIPv4 && a instanceof Inet6Address) {
+ log.atFine().log(
+ "using IPv6 network interface %s [hostAddress: %s, hostName: %s]",
+ ni.getName(), a.getHostAddress(), a.getHostName());
return Optional.of(a);
}
}
@@ -76,10 +90,8 @@
@VisibleForTesting
boolean shouldSkip(String name) {
for (String s : jgroupsConfig.skipInterface()) {
- if (s.endsWith("*") && name.startsWith(s.substring(0, s.length() - 1))) {
- return true;
- }
- if (name.equals(s)) {
+ if (name.equals(s) || s.endsWith("*") && name.startsWith(s.substring(0, s.length() - 1))) {
+ log.atFine().log("skipping network interface %s because it matches %s", name, s);
return true;
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProvider.java
new file mode 100644
index 0000000..f39e641
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProvider.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import com.google.inject.Singleton;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.jgroups.JChannel;
+
+@Singleton
+public class JChannelProvider implements Provider<JChannel> {
+ private final Configuration.JGroups jgroupsConfig;
+ private final Configuration.JGroupsKubernetes jgroupsKubernetesConfig;
+
+ @Inject
+ JChannelProvider(Configuration pluginConfiguration) {
+ this.jgroupsConfig = pluginConfiguration.jgroups();
+ this.jgroupsKubernetesConfig = pluginConfiguration.jgroupsKubernetes();
+ }
+
+ @Override
+ public JChannel get() {
+ Optional<Path> protocolStack = jgroupsConfig.protocolStack();
+ try {
+ if (protocolStack.isPresent()) {
+ return new JChannel(protocolStack.get().toString());
+ }
+ if (jgroupsConfig.useKubernetes()) {
+ if (jgroupsKubernetesConfig.namespace() != null) {
+ System.setProperty("KUBERNETES_NAMESPACE", jgroupsKubernetesConfig.namespace());
+ }
+ if (!jgroupsKubernetesConfig.labels().isEmpty()) {
+ System.setProperty(
+ "KUBERNETES_LABELS", String.join(",", jgroupsKubernetesConfig.labels()));
+ }
+ return new JChannel(getClass().getResource("kubernetes.xml").toString());
+ }
+ return new JChannel();
+ } catch (Exception e) {
+ throw new ProvisionException(
+ String.format(
+ "Unable to create a channel with protocol stack: %s",
+ protocolStack.map(Path::toString).orElse("default")),
+ e);
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProviderModule.java
similarity index 60%
copy from src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
copy to src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProviderModule.java
index 9b33ca3..c4764c8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutor.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JChannelProviderModule.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2015 The Android Open Source Project
+// Copyright (C) 2023 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.cache;
+package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import com.google.inject.AbstractModule;
+import org.jgroups.JChannel;
-import com.google.inject.BindingAnnotation;
-import java.lang.annotation.Retention;
-
-@Retention(RUNTIME)
-@BindingAnnotation
-@interface CacheExecutor {}
+public class JChannelProviderModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(JChannel.class).toProvider(JChannelProvider.class);
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
index 68efbec..776a0a4 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
@@ -24,7 +24,6 @@
import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.net.InetAddress;
-import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
import org.jgroups.Address;
@@ -48,16 +47,8 @@
public class JGroupsPeerInfoProvider
implements Receiver, Provider<Set<PeerInfo>>, LifecycleListener {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
- private static final String JGROUPS_LOG_FACTORY_PROPERTY = "jgroups.logging.log_factory_class";
-
- static {
- if (System.getProperty(JGROUPS_LOG_FACTORY_PROPERTY) == null) {
- System.setProperty(JGROUPS_LOG_FACTORY_PROPERTY, SLF4JLogFactory.class.getName());
- }
- }
private final Configuration.JGroups jgroupsConfig;
- private final Configuration.JGroupsKubernetes jgroupsKubernetesConfig;
private final InetAddressFinder finder;
private final String myUrl;
@@ -67,11 +58,14 @@
@Inject
JGroupsPeerInfoProvider(
- Configuration pluginConfiguration, InetAddressFinder finder, MyUrlProvider myUrlProvider) {
+ Configuration pluginConfiguration,
+ InetAddressFinder finder,
+ MyUrlProvider myUrlProvider,
+ JChannel channel) {
this.jgroupsConfig = pluginConfiguration.jgroups();
- this.jgroupsKubernetesConfig = pluginConfiguration.jgroupsKubernetes();
this.finder = finder;
this.myUrl = myUrlProvider.get();
+ this.channel = channel;
}
@Override
@@ -120,7 +114,6 @@
public void connect() {
try {
- channel = getChannel();
Optional<InetAddress> address = finder.findAddress();
if (address.isPresent()) {
log.atFine().log("Protocol stack: %s", channel.getProtocolStack());
@@ -146,31 +139,6 @@
}
}
- private JChannel getChannel() throws Exception {
- Optional<Path> protocolStack = jgroupsConfig.protocolStack();
- try {
- if (protocolStack.isPresent()) {
- return new JChannel(protocolStack.get().toString());
- }
- if (jgroupsConfig.useKubernetes()) {
- if (jgroupsKubernetesConfig.namespace() != null) {
- System.setProperty("KUBERNETES_NAMESPACE", jgroupsKubernetesConfig.namespace());
- }
- if (!jgroupsKubernetesConfig.labels().isEmpty()) {
- System.setProperty(
- "KUBERNETES_LABELS", String.join(",", jgroupsKubernetesConfig.labels()));
- }
- return new JChannel(getClass().getResource("kubernetes.xml").toString());
- }
- return new JChannel();
- } catch (Exception e) {
- log.atSevere().withCause(e).log(
- "Unable to create a channel with protocol stack: %s",
- protocolStack.isPresent() ? protocolStack : "default");
- throw e;
- }
- }
-
@VisibleForTesting
void setChannel(JChannel channel) {
this.channel = channel;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLog.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLog.java
deleted file mode 100644
index 43b6f9c..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLog.java
+++ /dev/null
@@ -1,176 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
-
-import org.jgroups.logging.Log;
-import org.slf4j.Logger;
-
-/**
- * A {@link Log} implementation which forwards all calls to the JGroups specific logging interface
- * to an instance of a SLF4J {@link Logger}. Only {@link #setLevel(String)} and {@link #getLevel()}
- * have no counterparts in SLF4J and are implemented as NOPs. Instances of this class are created by
- * {@link SLF4JLogFactory}. See the documentation of {@link SLF4JLogFactory} on how to configure
- * JGroups to use this class.
- *
- * @author christian.halstrick@sap.com
- */
-public class SLF4JLog implements Log {
- private final Logger logger;
-
- public SLF4JLog(Logger log) {
- this.logger = log;
- }
-
- @Override
- public boolean isFatalEnabled() {
- return logger.isErrorEnabled();
- }
-
- @Override
- public boolean isErrorEnabled() {
- return logger.isErrorEnabled();
- }
-
- @Override
- public boolean isWarnEnabled() {
- return logger.isWarnEnabled();
- }
-
- @Override
- public boolean isInfoEnabled() {
- return logger.isInfoEnabled();
- }
-
- @Override
- public boolean isDebugEnabled() {
- return logger.isDebugEnabled();
- }
-
- @Override
- public boolean isTraceEnabled() {
- return logger.isTraceEnabled();
- }
-
- @Override
- public void fatal(String msg) {
- logger.error(msg);
- }
-
- @Override
- public void fatal(String msg, Object... args) {
- logger.error(convMsg(msg, args));
- }
-
- @Override
- public void fatal(String msg, Throwable throwable) {
- logger.error(msg, throwable);
- }
-
- @Override
- public void error(String msg) {
- logger.error(msg);
- }
-
- @Override
- public void error(String format, Object... args) {
- if (isErrorEnabled()) {
- logger.error(convMsg(format, args));
- }
- }
-
- @Override
- public void error(String msg, Throwable throwable) {
- logger.error(msg, throwable);
- }
-
- @Override
- public void warn(String msg) {
- logger.warn(msg);
- }
-
- @Override
- public void warn(String msg, Object... args) {
- if (isWarnEnabled()) {
- logger.warn(convMsg(msg, args));
- }
- }
-
- @Override
- public void warn(String msg, Throwable throwable) {
- logger.warn(msg, throwable);
- }
-
- @Override
- public void info(String msg) {
- logger.info(msg);
- }
-
- @Override
- public void info(String msg, Object... args) {
- if (isInfoEnabled()) {
- logger.info(convMsg(msg, args));
- }
- }
-
- @Override
- public void debug(String msg) {
- logger.debug(msg);
- }
-
- @Override
- public void debug(String msg, Object... args) {
- if (isDebugEnabled()) {
- logger.debug(convMsg(msg, args));
- }
- }
-
- @Override
- public void debug(String msg, Throwable throwable) {
- logger.debug(msg, throwable);
- }
-
- @Override
- public void trace(Object msg) {
- logger.trace(msg.toString());
- }
-
- @Override
- public void trace(String msg) {
- logger.trace(msg);
- }
-
- @Override
- public void trace(String msg, Object... args) {
- if (isTraceEnabled()) {
- logger.trace(convMsg(msg, args));
- }
- }
-
- @Override
- public void trace(String msg, Throwable throwable) {
- logger.trace(msg, throwable);
- }
-
- private static String convMsg(String msg, Object... args) {
- return String.format(msg, args);
- }
-
- @Override
- public void setLevel(String level) {}
-
- @Override
- public String getLevel() {
- return "";
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLogFactory.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLogFactory.java
deleted file mode 100644
index 72ea803..0000000
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/SLF4JLogFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Copyright (C) 2017 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
-
-import org.jgroups.logging.CustomLogFactory;
-import org.jgroups.logging.Log;
-import org.slf4j.LoggerFactory;
-
-/**
- * A custom log factory which takes care to provide instances of {@link SLF4JLog}. If this factory
- * is used jgroups will use the SLF4J API to emit logging and tracing. This can be helpful when
- * gerrit is running in a runtime environment which supports SLF4J logging.
- *
- * <p>To configure jgroups to use this log factory make sure the following system property is set
- * when gerrit is started:
- *
- * <p>{@code
- * -Djgroups.logging.log_factory_class=com.ericsson.gerrit.plugins.highavailability.peers.jgroups.SLF4JLogFactory}
- */
-public class SLF4JLogFactory implements CustomLogFactory {
-
- @Override
- public Log getLog(@SuppressWarnings("rawtypes") Class clazz) {
- return new SLF4JLog(LoggerFactory.getLogger(clazz));
- }
-
- @Override
- public Log getLog(String category) {
- return new SLF4JLog(LoggerFactory.getLogger(category));
- }
-}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleaner.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleaner.java
index afc6568..b4260a7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleaner.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleaner.java
@@ -25,6 +25,7 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
+import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
@Singleton
@@ -32,7 +33,7 @@
private final WorkQueue queue;
private final Provider<CleanupTask> cleanupTaskProvider;
- private final long cleanupIntervalMillis;
+ private final Duration cleanupInterval;
private ScheduledFuture<?> scheduledCleanupTask;
static class CleanupTask implements Runnable {
@@ -64,7 +65,7 @@
WorkQueue queue, Provider<CleanupTask> cleanupTaskProvider, Configuration config) {
this.queue = queue;
this.cleanupTaskProvider = cleanupTaskProvider;
- this.cleanupIntervalMillis = config.websession().cleanupInterval();
+ this.cleanupInterval = config.websession().cleanupInterval();
}
@Override
@@ -75,7 +76,7 @@
.scheduleAtFixedRate(
cleanupTaskProvider.get(),
SECONDS.toMillis(1),
- cleanupIntervalMillis,
+ cleanupInterval.toMillis(),
MILLISECONDS);
}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 2d9c10c..d89f7b3 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -54,6 +54,25 @@
enable = true
```
+### JGroups based discovery and message transport
+
+In this case URLs of peers are not needed. All peers just need to join the same JGroups cluster
+defined by the `jgroups.clusterName`.
+```
+[main]
+ transport = jgroups
+ sharedDirectory = /directory/accessible/from/both/instances
+[autoReindex]
+ enabled = false
+[jgroups]
+ clusterName = foo
+ skipInterface = lo*
+ skipInterface = eth2
+ protocolStack = protocolStack.xml
+ timeout = 5000
+ maxTries = 100
+```
+
```main.sharedDirectory```
: Path to a directory accessible from both instances.
When given as a relative path, then it is resolved against the $SITE_PATH
@@ -62,6 +81,11 @@
directory is "/gerrit/root/shared/dir". When not specified, the default
is "shared".
+```main.transport```
+: Message transport layer. Could be: `http` or `jgroups`.
+ When not specificed the default is `http`.
+ When set to `jgroups` then all `peerInfo.*` sections are unnecessary and ignored.
+
```autoReindex.enabled```
: Enable the tracking of the latest change indexed under data/high-availability
for each of the indexes. At startup scans all the changes, accounts and groups
@@ -147,6 +171,27 @@
: A label that will be used to select the pods in the format `label=value`. Can be set
multiple times.
+```jgroups.timeout```
+: Maximum interval of time in milliseconds the JGroups wait for a response
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ forwarding a message. When not specified, the default value is 5 seconds.
+
+```jgroups.maxTries```
+: Maximum number of times JGroups should attempt to forward a messages. Must be at least 1.
+ Setting this option to zero or negative will assume the default value.
+ When not specified, the default value is 720 times.
+
+```jgroups.retryInterval```
+: The interval of time in milliseconds between the subsequent auto-retries.
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ When not specified, the default value is set to 10 seconds.
+
+```jgroups.threadPoolSize```
+: Maximum number of threads used to execute JGroups calls towards target instances.
+
+NOTE: the default settings for `jgroups.timeout` and `jgroups.maxTries` ensure
+that JGroups will keep retrying to forward a message for one hour.
+
NOTE: To work properly in certain environments, JGroups needs the System property
`java.net.preferIPv4Stack` to be set to `true`.
See [JGroups - Trouble shooting](http://jgroups.org/tutorial/index.html#_trouble_shooting).
@@ -163,31 +208,35 @@
```http.connectionTimeout```
: Maximum interval of time in milliseconds the plugin waits for a connection
- to the target instance. When not specified, the default value is set to 5000ms.
+ to the target instance.
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ When not specified, the default value is set to 5 seconds.
```http.socketTimeout```
: Maximum interval of time in milliseconds the plugin waits for a response from the
- target instance once the connection has been established. When not specified,
- the default value is set to 5000ms.
+ target instance once the connection has been established.
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ When not specified, the default value is set to 5 seconds.
```http.maxTries```
: Maximum number of times the plugin should attempt when calling a REST API in
- the target instance. Setting this value to 0 will disable retries. When not
- specified, the default value is 360. After this number of failed tries, an
+ the target instance. Must be at least 1.
+ Setting this option to zero or negative will assume the default value;
+ When not specified, the default value is 360. After this number of failed attempts, an
error is logged.
```http.retryInterval```
: The interval of time in milliseconds between the subsequent auto-retries.
- When not specified, the default value is set to 10000ms.
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ When not specified, the default value is set to 10 seconds.
+
+```http.threadPoolSize```
+: Maximum number of threads used to execute REST calls towards target instances.
```cache.synchronize```
: Whether to synchronize cache evictions.
Defaults to true.
-```cache.threadPoolSize```
-: Maximum number of threads used to send cache evictions to the target instance.
- Defaults to 4.
-
```cache.pattern```
: Pattern to match names of custom caches for which evictions should be
forwarded (in addition to the core caches that are always forwarded). May be
@@ -205,14 +254,6 @@
: Whether to synchronize stream events.
Defaults to true.
-```index.numStripedLocks```
-: Number of striped locks to use during reindexing. Should be of the same order
- of magnitude of the open changes. I.e.: if one has 5000 changes, one might want
- to have at least 1000 striped locks. The value has to be tuned empirically
- by checking the number of failures in acquiring the locking. Checking the
- `consider increasing the number of shards` warnings should help.
- Defaults to 10.
-
```index.synchronize```
: Whether to synchronize secondary indexes.
Defaults to true.
@@ -223,24 +264,26 @@
Defaults to true.
```index.threadPoolSize```
-: Maximum number of threads used to send index events to the target instance.
+: Maximum number of threads used to process index events in the receiving gerrit instance.
Defaults to 4.
```index.batchThreadPoolSize```
-: Maximum number of threads used to send batch index events to the target instance
+: Maximum number of threads used to process batch index events in the receiving gerrit instance
and not associated to an interactive action performed by a user.
- Defaults equal index.threadPoolSize.
+ Defaults to `index.threadPoolSize`.
```index.maxTries```
: Maximum number of times the plugin should attempt to reindex changes.
- Setting this value to 0 will disable retries. After this number of failed tries,
- an error is logged and the local index should be considered stale and needs
- to be investigated and manually reindexed.
+ Must be at least 1.
+ Setting this option to zero or negative will assume the default value;
+ After this number of failed tries, an error is logged and the local index should be considered
+ stale and needs to be investigated and manually reindexed.
Defaults to 2.
```index.retryInterval```
: The interval of time in milliseconds between the subsequent auto-retries.
- Defaults to 30000 (30 seconds).
+ Value is expressed in Gerrit time values as in [websession.cleanupInterval](#websessioncleanupInterval).
+ Defaults to 30 seconds.
NOTE: the default settings for `http.socketTimeout` and `http.maxTries` ensure
that the plugin will keep retrying to forward a message for one hour.
diff --git a/src/test/README.md b/src/test/README.md
index 1b8e4a2..fd59f64 100644
--- a/src/test/README.md
+++ b/src/test/README.md
@@ -1,19 +1,31 @@
# Gerrit high-availability docker setup example
The Docker Compose project in the docker directory contains a simple test
-environment of two Gerrit masters in HA configuration.
+environment of two Gerrit masters in HA configuration, with their git repos
+hosted on NFS filesystem.
## How to build
-The project can be built using docker-compose.
+The project can be built using docker-compose (make sure you set the
+`platform` attribute in the docker-compose.yaml file if you're not
+in an amd64 arch).
To build the Docker VMs:
-```
- $ docker-compose build
+```bash
+ # first, remove the buildx if it exists and its not running
+ $ docker buildx inspect docker-ha | grep Status
+ $ docker buildx rm docker-ha
+ # create the docker-ha buildx node, provide your architecture and start it up
+ docker buildx create --name docker-ha --platform "linux/amd64" --driver docker-container --use \
+ && docker buildx inspect --bootstrap \
+ && docker-compose build
```
### Building the Docker VMs using a non-default user id
+First, update the user id in the [NFS Dockerfile](./docker/nfs/Dockerfile).
+This is done simply by modifying the file setting the non-default user id.
+Then, run the following:
```
$ export GERRIT_UID=$(id -u)
$ docker-compose build --build-arg GERRIT_UID
@@ -24,6 +36,9 @@
host is not owned by you. For example, some corporate environments use a
restricted 1000 user (id). In that case, the containerized application
may fail to write towards the host (through volumes).
+**Important:** The user id in gerrit must be the same as the uid in the
+NFS server, otherwise you will encounter file ownership problems on any
+filesystem operation.
That UID will be the one set for the containerized gerrit user. Latter's
group will remain as default (1000). This is because groups known from
@@ -36,9 +51,41 @@
Use the 'up' target to startup the Docker Compose VMs.
```
- $ docker-compose up
+ $ docker-compose up -d
```
+## Background on using an NFS server
+We are using the `erichough/nfs-server` image mainly because it's easy to use
+& we had success with it. The work has been inspired by
+[this blog post](https://nothing2say.co.uk/running-a-linux-based-nfs-server-in-docker-on-windows-b64445d5ada2).
+
+The containers start with the `privileged` flag set, which is a security risk
+but necessary to work around permission issues.
+
+It is worth noting that we are exposing the `/var/gerrit/git` directory as the
+nfs-share. This is because more often than not it's the git directory that's
+shared over the network. You can change this in the nfs server and gerrit
+docker files, and in the `exports.txt` file.
+
+The NFS server is using a static IP. The Docker Compose YAML file defines a
+bridge network with the subnet `192.168.1.0/24` (this is what allows us to
+give the NFS Server a known, static IP).
+
+The `addr=192.168.1.5` option (in the `nfs-client-volume` volume) is the
+reason we need a static IP for the server (and hence a configured subnet
+for the network). Note that using a name (ie. addr=nfs-server) we weren't
+able to get the DNS resolution to work properly.
+
+Also in the Docker Compose file we can see that the `nfs-server` container
+uses a `healthcheck`, this is necessary to control when the `gerrit`
+services will start up (they need to start after the nfs server is fully
+up-and-running).
+
+Finally, we are providing an `exports.txt` file, which again utilises the
+subnet we provided during the bridge network creation. This file is baked
+into the image sacrificing a bit of flexibility, but we feel this is
+a small price to pay to have everything automated.
+
# Gerrit high-availability local setup example
1. Init gerrit instances with high-availability plugin installed:
diff --git a/src/test/docker/docker-compose.yaml b/src/test/docker/docker-compose.yaml
index 24140ca..f744d2e 100644
--- a/src/test/docker/docker-compose.yaml
+++ b/src/test/docker/docker-compose.yaml
@@ -2,16 +2,40 @@
services:
+ nfs-server:
+ build: nfs
+# platform: linux/arm64/v8 # uncomment for Apple Silicon arch
+ privileged: true
+ container_name: nfs-server
+ environment:
+ NFS_LOG_LEVEL: DEBUG
+ hostname: nfs-server
+ healthcheck:
+ test: ["CMD-SHELL", "sleep 10"] # required, otherwise the gerrit service will fail to start with a "connection refused" error
+ interval: 1s
+ timeout: 1m
+ retries: 10
+ ports:
+ - 2049:2049
+ networks:
+ nfs-server-bridge:
+ ipv4_address: 192.168.1.5
+ volumes:
+ - nfs-server-volume:/var/gerrit/git
gerrit-01:
build: gerrit
+ privileged: true
+ depends_on:
+ nfs-server:
+ condition: service_healthy
ports:
- "8081:8080"
- "29411:29418"
networks:
- - gerrit-net
+ nfs-server-bridge: null
volumes:
- /dev/urandom:/dev/random
- - gitvolume:/var/gerrit/git
+ - git-volume:/var/gerrit/git
- shareddir:/var/gerrit/shareddir
- ./etc/gerrit.config:/var/gerrit/etc/gerrit.config.orig
- ./etc/high-availability.gerrit-01.config:/var/gerrit/etc/high-availability.config.orig
@@ -20,16 +44,20 @@
gerrit-02:
build: gerrit
+ privileged: true
ports:
- "8082:8080"
- "29412:29418"
networks:
- - gerrit-net
+ nfs-server-bridge: null
depends_on:
- - gerrit-01
+ gerrit-01:
+ condition: service_started
+ nfs-server:
+ condition: service_healthy
volumes:
- /dev/urandom:/dev/random
- - gitvolume:/var/gerrit/git
+ - git-volume:/var/gerrit/git
- shareddir:/var/gerrit/shareddir
- ./etc/gerrit.config:/var/gerrit/etc/gerrit.config.orig
- ./etc/high-availability.gerrit-02.config:/var/gerrit/etc/high-availability.config.orig
@@ -43,7 +71,9 @@
- "80:80"
- "29418:29418"
networks:
- - gerrit-net
+ nfs-server-bridge: null
+ volumes:
+ - syslog-sidecar:/syslog-sidecar
depends_on:
- syslog-sidecar
- gerrit-01
@@ -52,12 +82,21 @@
syslog-sidecar:
build: docker-syslog-ng-stdout
networks:
- - gerrit-net
+ nfs-server-bridge: null
networks:
- gerrit-net:
- driver: bridge
+ nfs-server-bridge:
+ ipam:
+ driver: default
+ config:
+ - subnet: 192.168.1.0/24
volumes:
shareddir:
- gitvolume:
+ nfs-server-volume:
+ git-volume:
+ driver: "local"
+ driver_opts:
+ type: nfs
+ o: "addr=192.168.1.5,rw"
+ device: ":/var/gerrit/git"
diff --git a/src/test/docker/docker-syslog-ng-stdout/Dockerfile b/src/test/docker/docker-syslog-ng-stdout/Dockerfile
index 11f3059..52c1e64 100644
--- a/src/test/docker/docker-syslog-ng-stdout/Dockerfile
+++ b/src/test/docker/docker-syslog-ng-stdout/Dockerfile
@@ -1,10 +1,10 @@
-FROM alpine:3.4
+FROM alpine:3.13.4
MAINTAINER Ryan Schlesinger <ryan@outstand.com>
RUN apk add --no-cache bash syslog-ng
RUN mkdir /sidecar
-COPY config/* /etc/syslog-ng/
+COPY config/syslog-ng.conf /etc/syslog-ng/
COPY docker-entrypoint.sh /docker-entrypoint.sh
VOLUME ["/sidecar"]
CMD ["syslog-ng", "-F"]
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-destination.out b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-destination.out
deleted file mode 100644
index 170f7e4..0000000
--- a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-destination.out
+++ /dev/null
@@ -1 +0,0 @@
- destination d_stdout { pipe("/dev/stdout"); };
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-log.out b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-log.out
deleted file mode 100644
index 6901c41..0000000
--- a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-log.out
+++ /dev/null
@@ -1 +0,0 @@
-log { source(s_all); destination(d_stdout); };
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-plugins.std b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-plugins.std
deleted file mode 100644
index ef0ce3e..0000000
--- a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-plugins.std
+++ /dev/null
@@ -1 +0,0 @@
-@version: 3.7
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.sidecar b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.sidecar
deleted file mode 100644
index 1181f78..0000000
--- a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.sidecar
+++ /dev/null
@@ -1,5 +0,0 @@
-# sidecar log source for mounting between docker containers
- network(
- transport("udp")
- port("514")
- );
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.std b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.std
deleted file mode 100644
index 5a50916..0000000
--- a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng-source.std
+++ /dev/null
@@ -1,8 +0,0 @@
-# ---------------------------------------------------------------------------------
-# Default syslog-ng sources; Do not edit this file!
-# append source with line on a file: syslog-ng-source.<package>
-# ---------------------------------------------------------------------------------
-# message generated by Syslog-NG
- internal();
-# standard Linux log source (this is the default place for the syslog() function to send logs to)
- unix-dgram("/dev/log");
diff --git a/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng.conf b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng.conf
new file mode 100644
index 0000000..94cdac9
--- /dev/null
+++ b/src/test/docker/docker-syslog-ng-stdout/config/syslog-ng.conf
@@ -0,0 +1,136 @@
+@version:3.30
+@include "scl.conf"
+
+# syslog-ng configuration file.
+#
+# See syslog-ng(8) and syslog-ng.conf(5) for more information.
+#
+# Note: It also sources additional configuration files (*.conf)
+# located in /etc/syslog-ng/conf.d/.
+
+#
+# Options
+#
+options {
+ # Create destination directories if missing.
+ create_dirs(yes);
+
+ # The default action of syslog-ng is to log a MARK line to the file every
+ # 20 minutes. That's seems high for most people so turn it down to once an
+ # hour. Set it to zero if you don't want the functionality at all.
+ mark_freq(3600);
+
+ # The default action of syslog-ng is to log a STATS line to the file every
+ # 10 minutes. That's pretty ugly after a while. Change it to every 12 hours
+ # so you get a nice daily update of how many messages syslog-ng missed (0).
+ stats_freq(43200);
+
+ # Time to wait before a died connection is re-established (default is 60).
+ time_reopen(5);
+
+ # Disable DNS usage.
+ # syslog-ng blocks on DNS queries, so enabling DNS may lead to a DoS attack.
+ use_dns(no);
+ dns-cache(no);
+
+ # Default owner, group, and permissions for log files.
+ owner(root);
+ group(adm);
+ perm(0640);
+
+ # Default permissions for created directories.
+ dir_perm(0755);
+};
+
+
+#
+# Templates
+#
+
+template t_file {
+ template("${YEAR}-${MONTH}-${DAY} ${HOUR}:${MIN}:${SEC} ${LEVEL} ${MSGHDR}${MSG}\n");
+};
+
+
+#
+# Sources
+#
+
+#source s_sys {
+ # Standard system log source.
+# system();
+
+ # Messages generated by syslog-ng.
+# internal();
+#};
+
+# customisation
+source s_all {
+ internal();
+ unix-dgram("/dev/log");
+ network(
+ transport("udp")
+ port(514)
+ );
+};
+
+
+#
+# Destinations
+#
+
+destination d_auth { file("/var/log/auth.log" template(t_file)); };
+destination d_boot { file("/var/log/boot.log" template(t_file)); };
+destination d_cron { file("/var/log/cron.log" template(t_file)); };
+destination d_kern { file("/var/log/kern.log" template(t_file)); };
+destination d_mail { file("/var/log/mail.log" template(t_file) flush_lines(10)); };
+destination d_mesg { file("/var/log/messages" template(t_file)); };
+
+# Send messages to console of everyone logged in.
+destination d_cons_all { usertty("*"); };
+
+# Send message to the root's console.
+destination d_cons_root { usertty("root"); };
+
+# customisation
+destination d_stdout { pipe("/dev/stdout"); };
+
+
+#
+# Filters
+#
+
+filter f_auth { facility(auth, authpriv); };
+filter f_boot { facility(local7); };
+filter f_cron { facility(cron); };
+filter f_emerg { level(emerg); };
+filter f_kern { facility(kern); };
+filter f_mail { facility(mail); };
+
+filter f_default {
+ level(info..emerg)
+ and not (facility(auth)
+ or facility(authpriv)
+ or facility(cron)
+ or facility(kern)
+ or facility(mail));
+};
+
+
+#
+# Logs
+#
+
+#log { source(s_sys); filter(f_auth); destination(d_auth); };
+#log { source(s_sys); filter(f_boot); destination(d_boot); };
+#log { source(s_sys); filter(f_cron); destination(d_cron); };
+#log { source(s_sys); filter(f_emerg); destination(d_cons_root); };
+#log { source(s_sys); filter(f_kern); destination(d_kern); };
+#log { source(s_sys); filter(f_mail); destination(d_mail); };
+#log { source(s_sys); filter(f_default); destination(d_mesg); };
+
+# customisation
+log { source(s_all); destination(d_stdout); };
+
+# Source additional configuration files (.conf extension only)
+#@include "/etc/syslog-ng/conf.d/*.conf"
\ No newline at end of file
diff --git a/src/test/docker/docker-syslog-ng-stdout/docker-entrypoint.sh b/src/test/docker/docker-syslog-ng-stdout/docker-entrypoint.sh
index be51a09..79e09ec 100755
--- a/src/test/docker/docker-syslog-ng-stdout/docker-entrypoint.sh
+++ b/src/test/docker/docker-syslog-ng-stdout/docker-entrypoint.sh
@@ -1,49 +1,4 @@
#!/bin/bash
-# The following two methods are ripped from alpine's syslog-ng package.
-# This allows us (and a user) to customize the config with snippets.
-grep_syslog_conf_entries() {
- local section="$1" FN filelist
- grep -v '^#' /etc/syslog-ng/syslog-ng-${section}.std
- filelist=$(find /etc/syslog-ng/ -maxdepth 1 -type f -name "syslog-ng-${section}.*" | grep -Ev ".backup|.std|~")
- if [ $? -eq 0 ]
- then
- for FN in ${filelist}
- do
- grep -v '^#' $FN
- done
- fi
-}
-
-update() {
- local fname='/etc/syslog-ng/syslog-ng.conf'
- local f_tmp="/etc/syslog-ng/syslog-ng.conf.$$"
- for ng_std in options source destination filter log
- do
- [ -f /etc/syslog-ng/syslog-ng-${ng_std}.std ] || exit 1
- done
- {
- # create options entries
- grep_syslog_conf_entries plugins
- echo "options {"
- grep_syslog_conf_entries options
- echo "};"
- # create source entries
- echo "source s_all {"
- grep_syslog_conf_entries source
- echo "};"
- # create destination entries
- grep_syslog_conf_entries destination
- # create filter entries
- grep_syslog_conf_entries filter
- # create log entries
- grep_syslog_conf_entries log
- } > $f_tmp
- cp -p $f_tmp $fname
- rm -f $f_tmp
-}
-
-update
-
echo Starting "$@"
exec "$@"
diff --git a/src/test/docker/gerrit/Dockerfile b/src/test/docker/gerrit/Dockerfile
index caac77f..33a7b5b 100644
--- a/src/test/docker/gerrit/Dockerfile
+++ b/src/test/docker/gerrit/Dockerfile
@@ -1,28 +1,52 @@
-FROM gerritcodereview/gerrit:3.7.7-ubuntu20
+FROM almalinux:8.5
-ENV GERRIT_BRANCH=stable-3.7
+# Install dependencies
+RUN yum -y install \
+ git \
+ java-11-openjdk \
+ procps \
+ sudo \
+ passwd \
+ gettext \
+ && yum -y clean all
-ENV GERRIT_CI_URL=https://gerrit-ci.gerritforge.com/job
+ENV GERRIT_VERSION=3.6
+ENV JAVA_HOME /usr/lib/jvm/jre-11-openjdk
-USER root
+# Add gerrit user
+RUN adduser -p -m --uid 1000 gerrit --home-dir /home/gerrit
+RUN echo "gerrit:gerrit" | chpasswd
+RUN echo "gerrit ALL=(root) NOPASSWD:ALL" > /etc/sudoers.d/gerrit && \
+ chmod 0440 /etc/sudoers.d/gerrit
-RUN apt update && apt install -y iputils-ping nmap curl lsof gettext net-tools sudo
+# Create gerrit installation directory
+RUN mkdir -p /var/gerrit && chown -R gerrit /var/gerrit
+ADD --chown=gerrit \
+ "https://gerrit-ci.gerritforge.com/job/Gerrit-bazel-stable-$GERRIT_VERSION/lastSuccessfulBuild/artifact/gerrit/bazel-bin/release.war" \
+ /tmp/gerrit.war
+
+ADD --chown=gerrit \
+"https://gerrit-ci.gerritforge.com/job/plugin-javamelody-bazel-master-stable-$GERRIT_VERSION/lastSuccessfulBuild/artifact/bazel-bin/plugins/javamelody/javamelody.jar" \
+ /var/gerrit/plugins/javamelody.jar
+ADD --chown=gerrit \
+ "https://gerrit-ci.gerritforge.com/job/plugin-high-availability-bazel-stable-$GERRIT_VERSION/lastSuccessfulBuild/artifact/bazel-bin/plugins/high-availability/high-availability.jar" \
+ /var/gerrit/plugins/high-availability.jar
+
+ADD --chown=gerrit \
+ "https://repo1.maven.org/maven2/com/gerritforge/global-refdb/$GERRIT_VERSION.3.4/global-refdb-$GERRIT_VERSION.3.4.jar" \
+ /tmp
+
+ADD --chown=gerrit:gerrit ./wait-for-it.sh /bin
+
+# Change user
USER gerrit
-ADD --chown=gerrit:gerrit $GERRIT_CI_URL/plugin-javamelody-bazel-master-$GERRIT_BRANCH/lastSuccessfulBuild/artifact/bazel-bin/plugins/javamelody/javamelody.jar /var/gerrit/plugins/javamelody.jar
-ADD --chown=gerrit:gerrit $GERRIT_CI_URL/plugin-high-availability-bazel-$GERRIT_BRANCH/lastSuccessfulBuild/artifact/bazel-bin/plugins/high-availability/high-availability.jar /var/gerrit/plugins/high-availability.jar
-ADD --chown=gerrit:gerrit $GERRIT_CI_URL/module-global-refdb-bazel-$GERRIT_BRANCH/lastSuccessfulBuild/artifact/bazel-bin/plugins/global-refdb/global-refdb.jar /var/gerrit/lib/global-refdb.jar
+# Expose ssh and http ports
+EXPOSE 29418 8080
+
+COPY --chown=gerrit ./entrypoint.sh /bin
USER root
-ADD start.sh /bin/
-ADD wait-for-it.sh /bin/
-
-RUN rm -Rf /var/gerrit/{git,index,cache}/*
-
-ARG GERRIT_UID=1000
-RUN usermod -u ${GERRIT_UID} gerrit &> /dev/null
-
-ENTRYPOINT ["/usr/bin/env"]
-CMD /bin/start.sh
+ENTRYPOINT /bin/entrypoint.sh
diff --git a/src/test/docker/gerrit/entrypoint.sh b/src/test/docker/gerrit/entrypoint.sh
new file mode 100755
index 0000000..b382a3b
--- /dev/null
+++ b/src/test/docker/gerrit/entrypoint.sh
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+if [[ ! -z "$WAIT_FOR" ]]
+then
+ wait-for-it.sh $WAIT_FOR -t 600 -- echo "$WAIT_FOR is up"
+fi
+
+chown -R gerrit /var/gerrit/etc
+sudo -u gerrit cp /var/gerrit/etc/gerrit.config.orig /var/gerrit/etc/gerrit.config
+sudo -u gerrit cp /var/gerrit/etc/high-availability.config.orig /var/gerrit/etc/high-availability.config
+
+
+echo "Init gerrit..."
+sudo -u gerrit java -jar /tmp/gerrit.war init -d /var/gerrit --batch --install-all-plugins
+chown -R gerrit: /var/gerrit/shareddir
+
+# required until regression is fixed, see https://groups.google.com/g/repo-discuss/c/DH-ftHMiCyE/m/qF88c6KMAAAJ
+echo "Copying global ref db jar into lib"
+sudo -u gerrit cp /tmp/global-refdb-*.jar /var/gerrit/lib
+
+echo "Reindexing Gerrit..."
+cd /var/gerrit && sudo -u gerrit java -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit
+sudo -u gerrit git config -f /var/gerrit/etc/gerrit.config gerrit.canonicalWebUrl http://$HOSTNAME/
+sudo -u gerrit touch /var/gerrit/logs/{gc_log,error_log,httpd_log,sshd_log,replication_log} && tail -f /var/gerrit/logs/* | grep --line-buffered -v 'HEAD /' &
+
+echo "Running Gerrit ..."
+sudo -u gerrit /var/gerrit/bin/gerrit.sh run
diff --git a/src/test/docker/gerrit/start.sh b/src/test/docker/gerrit/start.sh
deleted file mode 100755
index d906628..0000000
--- a/src/test/docker/gerrit/start.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash -e
-
-if [[ ! -z "$WAIT_FOR" ]]
-then
- wait-for-it.sh $WAIT_FOR -t 600 -- echo "$WAIT_FOR is up"
-fi
-
-sudo -u gerrit cp /var/gerrit/etc/gerrit.config.orig /var/gerrit/etc/gerrit.config
-sudo -u gerrit cp /var/gerrit/etc/high-availability.config.orig /var/gerrit/etc/high-availability.config
-
-if [[ ! -f /var/gerrit/etc/ssh_host_ed25519_key ]]
-then
- echo "Initializing Gerrit site ..."
- sudo -u gerrit java -jar /var/gerrit/bin/gerrit.war init -d /var/gerrit --batch
- chown -R gerrit: /var/gerrit/shareddir
-fi
-
-echo "Reindexing Gerrit ..."
-sudo -u gerrit java -jar /var/gerrit/bin/gerrit.war reindex -d /var/gerrit
-sudo -u gerrit git config -f /var/gerrit/etc/gerrit.config gerrit.canonicalWebUrl http://$HOSTNAME/
-
-sudo -u gerrit touch /var/gerrit/logs/{gc_log,error_log,httpd_log,sshd_log,replication_log} && tail -f /var/gerrit/logs/* | grep --line-buffered -v 'HEAD /' &
-
-echo "Running Gerrit ..."
-sudo -u gerrit /var/gerrit/bin/gerrit.sh run
diff --git a/src/test/docker/nfs/Dockerfile b/src/test/docker/nfs/Dockerfile
new file mode 100644
index 0000000..19744f3
--- /dev/null
+++ b/src/test/docker/nfs/Dockerfile
@@ -0,0 +1,13 @@
+FROM erichough/nfs-server
+
+COPY exports.txt /etc/exports
+
+# To avoid ownership issues, the user must be the same betweeen the
+# server and the client, hence we are creating it explicitly in both.
+RUN adduser --disabled-password --gecos "" --uid 1000 gerrit
+
+# /var/gerrit/git is the shared directory
+RUN mkdir --parents /var/gerrit/git
+
+RUN chown gerrit:gerrit /var/lib/nfs
+RUN chown gerrit:gerrit /var/gerrit/git
\ No newline at end of file
diff --git a/src/test/docker/nfs/exports.txt b/src/test/docker/nfs/exports.txt
new file mode 100644
index 0000000..c31d586
--- /dev/null
+++ b/src/test/docker/nfs/exports.txt
@@ -0,0 +1 @@
+/var/gerrit/git 192.168.1.0/24(rw,no_subtree_check,insecure)
\ No newline at end of file
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index b6986db..e158099 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -17,9 +17,8 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.BATCH_THREAD_POOL_SIZE_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.CACHE_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_NUM_STRIPED_LOCKS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.ALLOWED_LISTENERS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
@@ -48,7 +47,6 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Main.DEFAULT_SHARED_DIRECTORY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Main.MAIN_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Main.SHARED_DIRECTORY_KEY;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.NUM_STRIPED_LOCKS;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.PEER_INFO_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfo.DEFAULT_PEER_INFO_STRATEGY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfo.STRATEGY_KEY;
@@ -58,11 +56,10 @@
import static com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfoStatic.URL_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.THREAD_POOL_SIZE_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.CLEANUP_INTERVAL_KEY;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.DEFAULT_CLEANUP_INTERVAL_MS;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.DEFAULT_CLEANUP_INTERVAL;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Websession.WEBSESSION_SECTION;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import com.ericsson.gerrit.plugins.highavailability.Configuration.PeerInfoStrategy;
@@ -74,6 +71,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -216,24 +214,24 @@
@Test
public void testGetConnectionTimeout() throws Exception {
- assertThat(getConfiguration().http().connectionTimeout()).isEqualTo(DEFAULT_TIMEOUT_MS);
+ assertThat(getConfiguration().http().connectionTimeout()).isEqualTo(DEFAULT_TIMEOUT);
globalPluginConfig.setInt(HTTP_SECTION, null, CONNECTION_TIMEOUT_KEY, TIMEOUT);
- assertThat(getConfiguration().http().connectionTimeout()).isEqualTo(TIMEOUT);
+ assertThat(getConfiguration().http().connectionTimeout().toMillis()).isEqualTo(TIMEOUT);
globalPluginConfig.setString(HTTP_SECTION, null, CONNECTION_TIMEOUT_KEY, INVALID_INT);
- assertThat(getConfiguration().http().connectionTimeout()).isEqualTo(DEFAULT_TIMEOUT_MS);
+ assertThat(getConfiguration().http().connectionTimeout()).isEqualTo(DEFAULT_TIMEOUT);
}
@Test
public void testGetSocketTimeout() throws Exception {
- assertThat(getConfiguration().http().socketTimeout()).isEqualTo(DEFAULT_TIMEOUT_MS);
+ assertThat(getConfiguration().http().socketTimeout()).isEqualTo(DEFAULT_TIMEOUT);
globalPluginConfig.setInt(HTTP_SECTION, null, SOCKET_TIMEOUT_KEY, TIMEOUT);
- assertThat(getConfiguration().http().socketTimeout()).isEqualTo(TIMEOUT);
+ assertThat(getConfiguration().http().socketTimeout().toMillis()).isEqualTo(TIMEOUT);
globalPluginConfig.setString(HTTP_SECTION, null, SOCKET_TIMEOUT_KEY, INVALID_INT);
- assertThat(getConfiguration().http().socketTimeout()).isEqualTo(DEFAULT_TIMEOUT_MS);
+ assertThat(getConfiguration().http().socketTimeout()).isEqualTo(DEFAULT_TIMEOUT);
}
@Test
@@ -245,6 +243,12 @@
globalPluginConfig.setString(HTTP_SECTION, null, MAX_TRIES_KEY, INVALID_INT);
assertThat(getConfiguration().http().maxTries()).isEqualTo(DEFAULT_MAX_TRIES);
+
+ globalPluginConfig.setString(HTTP_SECTION, null, MAX_TRIES_KEY, "0");
+ assertThat(getConfiguration().http().maxTries()).isEqualTo(DEFAULT_MAX_TRIES);
+
+ globalPluginConfig.setString(HTTP_SECTION, null, MAX_TRIES_KEY, "-1");
+ assertThat(getConfiguration().http().maxTries()).isEqualTo(DEFAULT_MAX_TRIES);
}
@Test
@@ -252,7 +256,7 @@
assertThat(getConfiguration().http().retryInterval()).isEqualTo(DEFAULT_RETRY_INTERVAL);
globalPluginConfig.setInt(HTTP_SECTION, null, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
- assertThat(getConfiguration().http().retryInterval()).isEqualTo(RETRY_INTERVAL);
+ assertThat(getConfiguration().http().retryInterval().toMillis()).isEqualTo(RETRY_INTERVAL);
globalPluginConfig.setString(HTTP_SECTION, null, RETRY_INTERVAL_KEY, INVALID_INT);
assertThat(getConfiguration().http().retryInterval()).isEqualTo(DEFAULT_RETRY_INTERVAL);
@@ -297,17 +301,6 @@
}
@Test
- public void testGetCacheThreadPoolSize() throws Exception {
- assertThat(getConfiguration().cache().threadPoolSize()).isEqualTo(DEFAULT_THREAD_POOL_SIZE);
-
- globalPluginConfig.setInt(CACHE_SECTION, null, THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
- assertThat(getConfiguration().cache().threadPoolSize()).isEqualTo(THREAD_POOL_SIZE);
-
- globalPluginConfig.setString(CACHE_SECTION, null, THREAD_POOL_SIZE_KEY, INVALID_INT);
- assertThat(getConfiguration().cache().threadPoolSize()).isEqualTo(DEFAULT_THREAD_POOL_SIZE);
- }
-
- @Test
public void testGetCacheSynchronize() throws Exception {
assertThat(getConfiguration().cache().synchronize()).isEqualTo(DEFAULT_SYNCHRONIZE);
@@ -441,10 +434,10 @@
@Test
public void testGetCleanupInterval() throws Exception {
assertThat(getConfiguration().websession().cleanupInterval())
- .isEqualTo(DEFAULT_CLEANUP_INTERVAL_MS);
+ .isEqualTo(DEFAULT_CLEANUP_INTERVAL);
globalPluginConfig.setString(WEBSESSION_SECTION, null, CLEANUP_INTERVAL_KEY, "30 seconds");
- assertThat(getConfiguration().websession().cleanupInterval()).isEqualTo(SECONDS.toMillis(30));
+ assertThat(getConfiguration().websession().cleanupInterval()).isEqualTo(Duration.ofSeconds(30));
}
@Test
@@ -482,14 +475,6 @@
}
@Test
- public void testGetIndexNumStripedLocks() throws Exception {
- assertThat(getConfiguration().index().numStripedLocks()).isEqualTo(DEFAULT_NUM_STRIPED_LOCKS);
-
- globalPluginConfig.setInt(INDEX_SECTION, null, NUM_STRIPED_LOCKS, 100);
- assertThat(getConfiguration().index().numStripedLocks()).isEqualTo(100);
- }
-
- @Test
public void testGetIndexSynchronizeForced() throws Exception {
assertThat(getConfiguration().index().synchronizeForced())
.isEqualTo(DEFAULT_SYNCHRONIZE_FORCED);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandlerTest.java
index e6633d8..3d091a0 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheEvictionHandlerTest.java
@@ -14,7 +14,7 @@
package com.ericsson.gerrit.plugins.highavailability.cache;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.concurrent.Executor;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
@@ -35,11 +34,9 @@
@RunWith(MockitoJUnitRunner.class)
public class CacheEvictionHandlerTest {
- @Mock private Executor executorMock;
@Mock private Forwarder forwarder;
@Mock private PluginConfigFactory pluginConfigFactoryMock;
- private static final String PLUGIN_NAME = "high-availability";
private static final Path SITE_PATH = Paths.get("/site_path");
private CachePatternMatcher defaultCacheMatcher;
@@ -52,9 +49,9 @@
@Test
public void shouldNotPublishAccountsCacheEvictions() {
CacheEvictionHandler<String, String> handler =
- new CacheEvictionHandler<>(forwarder, executorMock, PLUGIN_NAME, defaultCacheMatcher);
+ new CacheEvictionHandler<>(forwarder, defaultCacheMatcher);
handler.onRemoval(
"test", "accounts", RemovalNotification.create("test", "accounts", RemovalCause.EXPLICIT));
- verifyZeroInteractions(executorMock);
+ verifyNoInteractions(forwarder);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProviderTest.java
deleted file mode 100644
index 7f7071b..0000000
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/CacheExecutorProviderTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.cache;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class CacheExecutorProviderTest {
-
- @Mock private ScheduledThreadPoolExecutor executorMock;
-
- private CacheExecutorProvider cacheExecutorProvider;
-
- @Before
- public void setUp() throws Exception {
- WorkQueue workQueueMock = mock(WorkQueue.class);
- when(workQueueMock.createQueue(4, "Forward-Cache-Eviction-Event")).thenReturn(executorMock);
- Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
- when(configMock.cache().threadPoolSize()).thenReturn(4);
-
- cacheExecutorProvider = new CacheExecutorProvider(workQueueMock, configMock);
- }
-
- @Test
- public void shouldReturnExecutor() throws Exception {
- assertThat(cacheExecutorProvider.get()).isEqualTo(executorMock);
- }
-
- @Test
- public void testStop() throws Exception {
- cacheExecutorProvider.start();
- assertThat(cacheExecutorProvider.get()).isEqualTo(executorMock);
- cacheExecutorProvider.stop();
- verify(executorMock).shutdown();
- assertThat(cacheExecutorProvider.get()).isNull();
- }
-}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandlerTest.java
index ec114f7..1bd2aed 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/cache/ProjectListUpdateHandlerTest.java
@@ -14,16 +14,13 @@
package com.ericsson.gerrit.plugins.highavailability.cache;
-import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
-import com.ericsson.gerrit.plugins.highavailability.cache.ProjectListUpdateHandler.ProjectListUpdateTask;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.NewProjectCreatedListener;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import org.junit.Before;
@@ -34,15 +31,13 @@
@RunWith(MockitoJUnitRunner.class)
public class ProjectListUpdateHandlerTest {
- private static final String PLUGIN_NAME = "high-availability";
-
private ProjectListUpdateHandler handler;
@Mock private Forwarder forwarder;
@Before
public void setUp() {
- handler = new ProjectListUpdateHandler(forwarder, MoreExecutors.directExecutor(), PLUGIN_NAME);
+ handler = new ProjectListUpdateHandler(forwarder);
}
@Test
@@ -69,23 +64,6 @@
handler.onNewProjectCreated(mock(NewProjectCreatedListener.Event.class));
handler.onProjectDeleted(mock(ProjectDeletedListener.Event.class));
Context.unsetForwardedEvent();
- verifyZeroInteractions(forwarder);
- }
-
- @Test
- public void testProjectUpdateTaskToString() throws Exception {
- String projectName = "someProjectName";
- ProjectListUpdateTask task = handler.new ProjectListUpdateTask(projectName, false);
- assertThat(task.toString())
- .isEqualTo(
- String.format(
- "[%s] Update project list in target instance: add '%s'", PLUGIN_NAME, projectName));
-
- task = handler.new ProjectListUpdateTask(projectName, true);
- assertThat(task.toString())
- .isEqualTo(
- String.format(
- "[%s] Update project list in target instance: remove '%s'",
- PLUGIN_NAME, projectName));
+ verifyNoInteractions(forwarder);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProviderTest.java
deleted file mode 100644
index 6e666ed..0000000
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventExecutorProviderTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.event;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class EventExecutorProviderTest {
- @Mock private ScheduledThreadPoolExecutor executorMock;
- private EventExecutorProvider eventsExecutorProvider;
-
- @Before
- public void setUp() throws Exception {
- WorkQueue workQueueMock = mock(WorkQueue.class);
- when(workQueueMock.createQueue(1, "Forward-Stream-Event")).thenReturn(executorMock);
- eventsExecutorProvider = new EventExecutorProvider(workQueueMock);
- }
-
- @Test
- public void shouldReturnExecutor() throws Exception {
- assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
- }
-
- @Test
- public void testStop() throws Exception {
- eventsExecutorProvider.start();
- assertThat(eventsExecutorProvider.get()).isEqualTo(executorMock);
- eventsExecutorProvider.stop();
- verify(executorMock).shutdown();
- assertThat(eventsExecutorProvider.get()).isNull();
- }
-}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandlerTest.java
index 663885f..988064e 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/event/EventHandlerTest.java
@@ -14,18 +14,14 @@
package com.ericsson.gerrit.plugins.highavailability.event;
-import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
-import com.ericsson.gerrit.plugins.highavailability.event.EventHandler.EventTask;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.ProjectEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -34,15 +30,13 @@
@RunWith(MockitoJUnitRunner.class)
public class EventHandlerTest {
- private static final String PLUGIN_NAME = "high-availability";
-
private EventHandler eventHandler;
@Mock private Forwarder forwarder;
@Before
public void setUp() {
- eventHandler = new EventHandler(forwarder, MoreExecutors.directExecutor(), PLUGIN_NAME);
+ eventHandler = new EventHandler(forwarder);
}
@Test
@@ -55,7 +49,7 @@
@Test
public void shouldNotForwardNonProjectEvent() throws Exception {
eventHandler.onEvent(mock(Event.class));
- verifyZeroInteractions(forwarder);
+ verifyNoInteractions(forwarder);
}
@Test
@@ -63,15 +57,6 @@
Context.setForwardedEvent(true);
eventHandler.onEvent(mock(ProjectEvent.class));
Context.unsetForwardedEvent();
- verifyZeroInteractions(forwarder);
- }
-
- @Test
- public void tesEventTaskToString() throws Exception {
- Event event = new RefUpdatedEvent();
- EventTask task = eventHandler.new EventTask(event);
- assertThat(task.toString())
- .isEqualTo(
- String.format("[%s] Send event '%s' to target instance", PLUGIN_NAME, event.type));
+ verifyNoInteractions(forwarder);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedAwareEventBrokerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedAwareEventBrokerTest.java
index 4c639ba..12c0364 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedAwareEventBrokerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedAwareEventBrokerTest.java
@@ -16,7 +16,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoInteractions;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.Event;
@@ -59,6 +59,6 @@
} finally {
Context.unsetForwardedEvent();
}
- verifyZeroInteractions(listenerMock);
+ verifyNoInteractions(listenerMock);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
index 0e9cee8..e549e50 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -16,6 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@@ -46,7 +47,7 @@
@Test
public void testSuccessfulIndexing() throws Exception {
- handler.index(id, Operation.INDEX, Optional.empty());
+ handler.index(id, Operation.INDEX, Optional.empty()).get(10, SECONDS);
verify(indexerMock).index(id);
}
@@ -55,7 +56,7 @@
UnsupportedOperationException thrown =
assertThrows(
UnsupportedOperationException.class,
- () -> handler.index(id, Operation.DELETE, Optional.empty()));
+ () -> handler.index(id, Operation.DELETE, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().contains("Delete from account index not supported");
}
@@ -73,7 +74,7 @@
.index(id);
assertThat(Context.isForwardedEvent()).isFalse();
- handler.index(id, Operation.INDEX, Optional.empty());
+ handler.index(id, Operation.INDEX, Optional.empty()).get(10, SECONDS);
assertThat(Context.isForwardedEvent()).isFalse();
verify(indexerMock).index(id);
@@ -92,7 +93,9 @@
assertThat(Context.isForwardedEvent()).isFalse();
IOException thrown =
- assertThrows(IOException.class, () -> handler.index(id, Operation.INDEX, Optional.empty()));
+ assertThrows(
+ IOException.class,
+ () -> handler.index(id, Operation.INDEX, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
assertThat(Context.isForwardedEvent()).isFalse();
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
index fe3e3fb..427985a 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -16,7 +16,10 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -26,13 +29,18 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
+import com.ericsson.gerrit.plugins.highavailability.index.ForwardedIndexExecutorProvider;
+import com.ericsson.gerrit.plugins.highavailability.index.ForwardedIndexFailsafeExecutorProvider;
import com.google.gerrit.entities.Change;
import com.google.gerrit.server.index.change.ChangeIndexer;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.util.OneOffRequestContext;
+import dev.failsafe.FailsafeExecutor;
import java.io.IOException;
+import java.time.Duration;
import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,50 +61,57 @@
@Mock private ChangeIndexer indexerMock;
@Mock private ChangeNotes changeNotes;
- @Mock private Configuration configMock;
- @Mock private Configuration.Index indexMock;
- @Mock private ScheduledExecutorService indexExecutorMock;
+
+ @Mock(answer = RETURNS_DEEP_STUBS)
+ private Configuration configMock;
+
@Mock private OneOffRequestContext ctxMock;
@Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
@Mock private ChangeChecker changeCheckerAbsentMock;
@Mock private ChangeChecker changeCheckerPresentMock;
+ @Mock private ForwardedIndexExecutorProvider indexExecutorProviderMock;
private ForwardedIndexChangeHandler handler;
private Change.Id id;
@Before
public void setUp() throws Exception {
id = Change.id(TEST_CHANGE_NUMBER);
- when(configMock.index()).thenReturn(indexMock);
+ when(configMock.index().threadPoolSize()).thenReturn(4);
+ when(configMock.index().maxTries()).thenReturn(3);
+ when(configMock.index().retryInterval()).thenReturn(Duration.ofMillis(10));
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
+ when(indexExecutorProviderMock.get()).thenReturn(Executors.newScheduledThreadPool(2));
+ FailsafeExecutor<Boolean> indexExecutor =
+ new ForwardedIndexFailsafeExecutorProvider(configMock, indexExecutorProviderMock).get();
handler =
new ForwardedIndexChangeHandler(
- indexerMock, configMock, indexExecutorMock, ctxMock, changeCheckerFactoryMock);
+ indexerMock, indexExecutor, ctxMock, changeCheckerFactoryMock);
}
@Test
public void changeIsIndexedWhenUpToDate() throws Exception {
setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()).get(10, SECONDS);
verify(indexerMock, times(1)).index(any(ChangeNotes.class));
}
@Test
public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.of(new IndexEvent()));
- verify(indexerMock, times(1)).index(any(ChangeNotes.class));
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.of(new IndexEvent())).get(10, SECONDS);
+ verify(indexerMock, atLeast(1)).index(any(ChangeNotes.class));
}
@Test
public void changeIsDeletedFromIndex() throws Exception {
- handler.index(TEST_CHANGE_ID, Operation.DELETE, Optional.empty());
+ handler.index(TEST_CHANGE_ID, Operation.DELETE, Optional.empty()).get(10, SECONDS);
verify(indexerMock, times(1)).delete(id);
}
@Test
public void changeToIndexDoesNotExist() throws Exception {
setupChangeAccessRelatedMocks(CHANGE_DOES_NOT_EXIST, CHANGE_OUTDATED);
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()).get(10, SECONDS);
verify(indexerMock, times(0)).delete(id);
}
@@ -115,7 +130,7 @@
.index(any(ChangeNotes.class));
assertThat(Context.isForwardedEvent()).isFalse();
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()).get(10, SECONDS);
assertThat(Context.isForwardedEvent()).isFalse();
verify(indexerMock, times(1)).index(any(ChangeNotes.class));
@@ -134,11 +149,12 @@
.index(any(ChangeNotes.class));
assertThat(Context.isForwardedEvent()).isFalse();
- IOException thrown =
+ ExecutionException thrown =
assertThrows(
- IOException.class,
- () -> handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()));
- assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
+ ExecutionException.class,
+ () ->
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()).get(10, SECONDS));
+ assertThat(thrown.getCause()).hasMessageThat().isEqualTo("someMessage");
assertThat(Context.isForwardedEvent()).isFalse();
verify(indexerMock, times(1)).index(any(ChangeNotes.class));
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
index 5fdb151..babe1d4 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -16,6 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@@ -46,7 +47,7 @@
@Test
public void testSuccessfulIndexing() throws Exception {
- handler.index(uuid, Operation.INDEX, Optional.empty());
+ handler.index(uuid, Operation.INDEX, Optional.empty()).get(10, SECONDS);
verify(indexerMock).index(uuid);
}
@@ -55,7 +56,7 @@
UnsupportedOperationException thrown =
assertThrows(
UnsupportedOperationException.class,
- () -> handler.index(uuid, Operation.DELETE, Optional.empty()));
+ () -> handler.index(uuid, Operation.DELETE, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().contains("Delete from group index not supported");
}
@@ -73,7 +74,7 @@
.index(uuid);
assertThat(Context.isForwardedEvent()).isFalse();
- handler.index(uuid, Operation.INDEX, Optional.empty());
+ handler.index(uuid, Operation.INDEX, Optional.empty()).get(10, SECONDS);
assertThat(Context.isForwardedEvent()).isFalse();
verify(indexerMock).index(uuid);
@@ -93,7 +94,8 @@
assertThat(Context.isForwardedEvent()).isFalse();
IOException thrown =
assertThrows(
- IOException.class, () -> handler.index(uuid, Operation.INDEX, Optional.empty()));
+ IOException.class,
+ () -> handler.index(uuid, Operation.INDEX, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
assertThat(Context.isForwardedEvent()).isFalse();
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
index 0c896b3..f91a83e 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -16,6 +16,7 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
@@ -46,7 +47,7 @@
@Test
public void testSuccessfulIndexing() throws Exception {
- handler.index(nameKey, Operation.INDEX, Optional.empty());
+ handler.index(nameKey, Operation.INDEX, Optional.empty()).get(10, SECONDS);
verify(indexerMock).index(nameKey);
}
@@ -55,7 +56,7 @@
UnsupportedOperationException thrown =
assertThrows(
UnsupportedOperationException.class,
- () -> handler.index(nameKey, Operation.DELETE, Optional.empty()));
+ () -> handler.index(nameKey, Operation.DELETE, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().contains("Delete from project index not supported");
}
@@ -73,7 +74,7 @@
.index(nameKey);
assertThat(Context.isForwardedEvent()).isFalse();
- handler.index(nameKey, Operation.INDEX, Optional.empty());
+ handler.index(nameKey, Operation.INDEX, Optional.empty()).get(10, SECONDS);
assertThat(Context.isForwardedEvent()).isFalse();
verify(indexerMock).index(nameKey);
@@ -93,7 +94,8 @@
assertThat(Context.isForwardedEvent()).isFalse();
IOException thrown =
assertThrows(
- IOException.class, () -> handler.index(nameKey, Operation.INDEX, Optional.empty()));
+ IOException.class,
+ () -> handler.index(nameKey, Operation.INDEX, Optional.empty()).get(10, SECONDS));
assertThat(thrown).hasMessageThat().isEqualTo("someMessage");
assertThat(Context.isForwardedEvent()).isFalse();
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java
new file mode 100644
index 0000000..2f5c5b7
--- /dev/null
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java
@@ -0,0 +1,147 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.ericsson.gerrit.plugins.highavailability.cache.Constants;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.CacheKeyJsonParser;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gson.Gson;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CommandDeserializerTest {
+
+ private Gson gson;
+ private CacheKeyJsonParser cacheKeyParser;
+
+ @Before
+ public void setUp() {
+ Gson eventGson = new EventGsonProvider().get();
+ this.gson = new JGroupsForwarderModule().buildJGroupsGson(eventGson);
+ this.cacheKeyParser = new CacheKeyJsonParser(eventGson);
+ }
+
+ @Test
+ public void indexAccount() {
+ Command cmd = gson.fromJson("{type: 'index-account', id: 100}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexAccount.class);
+ IndexAccount index = (IndexAccount) cmd;
+ assertThat(index.getId()).isEqualTo(100);
+ }
+
+ @Test
+ public void updateChangeCommand() {
+ Command cmd =
+ gson.fromJson("{type: 'update-change', projectName: 'foo', id: 100}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexChange.Update.class);
+ IndexChange.Update update = (IndexChange.Update) cmd;
+ assertThat(update.getId()).isEqualTo("foo~100");
+ assertThat(update.isBatch()).isFalse();
+ }
+
+ @Test
+ public void batchUpdateChangeCommand() {
+ Command cmd =
+ gson.fromJson(
+ "{type: 'update-change', projectName: 'foo', id: 100, batchMode: 'true'}",
+ Command.class);
+ assertThat(cmd).isInstanceOf(IndexChange.Update.class);
+ IndexChange.Update update = (IndexChange.Update) cmd;
+ assertThat(update.getId()).isEqualTo("foo~100");
+ assertThat(update.isBatch()).isTrue();
+ }
+
+ @Test
+ public void deleteChangeCommand() {
+ Command cmd = gson.fromJson("{type: 'delete-change', id: 100}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexChange.Delete.class);
+ IndexChange.Delete delete = (IndexChange.Delete) cmd;
+ assertThat(delete.getId()).isEqualTo("~100");
+
+ cmd = gson.fromJson("{type: 'delete-change', projectName: 'foo', id: 100}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexChange.Delete.class);
+ delete = (IndexChange.Delete) cmd;
+ assertThat(delete.getId()).isEqualTo("foo~100");
+ }
+
+ @Test
+ public void indexGroup() {
+ Command cmd = gson.fromJson("{type: 'index-group', uuid: 'foo'}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexGroup.class);
+ IndexGroup index = (IndexGroup) cmd;
+ assertThat(index.getUuid()).isEqualTo("foo");
+ }
+
+ @Test
+ public void indexProject() {
+ Command cmd = gson.fromJson("{type: 'index-project', projectName: 'foo'}", Command.class);
+ assertThat(cmd).isInstanceOf(IndexProject.class);
+ IndexProject index = (IndexProject) cmd;
+ assertThat(index.getProjectName()).isEqualTo("foo");
+ }
+
+ @Test
+ public void postEvent() {
+ Command cmd =
+ gson.fromJson(
+ "{event: {projectName : 'foo', headName : 'refs/heads/master', type :"
+ + " 'project-created', eventCreatedOn:1505898779}, type : 'post-event'}",
+ Command.class);
+ assertThat(cmd).isInstanceOf(PostEvent.class);
+ Event e = ((PostEvent) cmd).getEvent();
+ assertThat(e).isInstanceOf(ProjectCreatedEvent.class);
+ assertThat(((ProjectCreatedEvent) e).projectName).isEqualTo("foo");
+ }
+
+ @Test
+ public void evictCache() {
+ Project.NameKey nameKey = Project.nameKey("foo");
+ String keyJson = gson.toJson(nameKey);
+ Command cmd =
+ gson.fromJson(
+ String.format(
+ "{type: '%s', cacheName: '%s', keyJson: '%s'}",
+ EvictCache.TYPE, Constants.PROJECTS, keyJson),
+ EvictCache.class);
+ assertThat(cmd).isInstanceOf(EvictCache.class);
+ EvictCache evict = (EvictCache) cmd;
+
+ Object cacheKey = cacheKeyParser.fromJson(Constants.PROJECTS, evict.getKeyJson());
+ assertThat(cacheKey).isInstanceOf(Project.NameKey.class);
+ assertThat(cacheKey).isEqualTo(nameKey);
+ }
+
+ @Test
+ public void addToProjectList() {
+ Command cmd = gson.fromJson("{type: 'add-to-project-list', projectName: 'foo'}", Command.class);
+ assertThat(cmd).isInstanceOf(AddToProjectList.class);
+ AddToProjectList addToProjectList = (AddToProjectList) cmd;
+ assertThat(addToProjectList.getProjectName()).isEqualTo("foo");
+ }
+
+ @Test
+ public void removeFromProjectList() {
+ Command cmd =
+ gson.fromJson("{type: 'remove-from-project-list', projectName: 'foo'}", Command.class);
+ assertThat(cmd).isInstanceOf(RemoveFromProjectList.class);
+ RemoveFromProjectList removeFromProjectList = (RemoveFromProjectList) cmd;
+ assertThat(removeFromProjectList.getProjectName()).isEqualTo("foo");
+ }
+}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java
new file mode 100644
index 0000000..082b5d1
--- /dev/null
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java
@@ -0,0 +1,109 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gson.Gson;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import org.jgroups.Address;
+import org.jgroups.blocks.MessageDispatcher;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.jgroups.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JGroupsForwarderTest {
+
+ private static final int MAX_TRIES = 3;
+ private static final int THREAD_POOLS_SIZE = 4;
+
+ private static final Address A1 = new UUID(1, 1);
+ private static final Address A2 = new UUID(2, 2);
+ private static final Rsp<Object> RSP_OK = new Rsp<>(true);
+ private static final Rsp<Object> RSP_FAIL = new Rsp<>(false);
+
+ private MessageDispatcher dispatcher;
+ private JGroupsForwarder forwarder;
+
+ @Before
+ public void setUp() throws Exception {
+ Gson eventGson = new EventGsonProvider().get();
+ Gson gson = new JGroupsForwarderModule().buildJGroupsGson(eventGson);
+ Configuration cfg = mock(Configuration.class, RETURNS_DEEP_STUBS);
+ when(cfg.jgroups().maxTries()).thenReturn(MAX_TRIES);
+ when(cfg.jgroups().retryInterval()).thenReturn(Duration.ofMillis(1));
+ when(cfg.jgroups().threadPoolSize()).thenReturn(THREAD_POOLS_SIZE);
+
+ dispatcher = mock(MessageDispatcher.class, RETURNS_DEEP_STUBS);
+ when(dispatcher.getChannel().getView().size()).thenReturn(2);
+ when(dispatcher.getChannel().getView().getMembers()).thenReturn(List.of(A1, A2));
+
+ WorkQueue workQueue = mock(WorkQueue.class);
+ when(workQueue.createQueue(THREAD_POOLS_SIZE, "JGroupsForwarder"))
+ .thenReturn(Executors.newScheduledThreadPool(THREAD_POOLS_SIZE));
+ forwarder =
+ new JGroupsForwarder(
+ dispatcher, cfg, gson, new FailsafeExecutorProvider(cfg, workQueue).get());
+ }
+
+ @Test
+ public void castMessageOK_returnsTrue() throws Exception {
+ RspList<Object> OK = new RspList<>(Map.of(A1, RSP_OK, A2, RSP_OK));
+ when(dispatcher.castMessage(any(), any(), any())).thenReturn(OK);
+
+ CompletableFuture<Boolean> result = forwarder.indexAccount(100, null);
+ assertThat(result.get()).isTrue();
+ verify(dispatcher, times(1)).castMessage(any(), any(), any());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void castMessageRetriesWithSucess_returnsTrue() throws Exception {
+ RspList<Object> OK = new RspList<>(Map.of(A1, RSP_OK, A2, RSP_OK));
+ RspList<Object> FAIL = new RspList<>(Map.of(A1, RSP_OK, A2, RSP_FAIL));
+ when(dispatcher.castMessage(any(), any(), any())).thenReturn(FAIL, OK);
+
+ CompletableFuture<Boolean> result = forwarder.indexAccount(100, null);
+ assertThat(result.get()).isTrue();
+ verify(dispatcher, times(2)).castMessage(any(), any(), any());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void castMessageFailsMaxTriesTimes_returnsFalse() throws Exception {
+ RspList<Object> FAIL = new RspList<>(Map.of(A1, RSP_FAIL, A2, RSP_FAIL));
+ // return FAIL x MAX_TRIES
+ when(dispatcher.castMessage(any(), any(), any())).thenReturn(FAIL, FAIL, FAIL);
+
+ CompletableFuture<Boolean> result = forwarder.indexAccount(100, null);
+ assertThat(result.get()).isFalse();
+ verify(dispatcher, times(MAX_TRIES)).castMessage(any(), any(), any());
+ }
+}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java
new file mode 100644
index 0000000..c3114de
--- /dev/null
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java
@@ -0,0 +1,208 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheEntry;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheNotFoundException;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedCacheEvictionHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedEventHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexAccountHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexBatchChangeHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler;
+import com.google.gerrit.entities.Account;
+import com.google.gerrit.entities.Change;
+import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGsonProvider;
+import com.google.gerrit.server.events.EventTypes;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.jgroups.ObjectMessage;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class MessageProcessorTest {
+
+ private MessageProcessor processor;
+ private Gson gson;
+
+ private ForwardedIndexChangeHandler indexChangeHandler;
+ private ForwardedIndexBatchChangeHandler indexBatchChangeHandler;
+ private ForwardedIndexAccountHandler indexAccountHandler;
+ private ForwardedCacheEvictionHandler cacheEvictionHandler;
+ private ForwardedEventHandler eventHandler;
+ private ForwardedProjectListUpdateHandler projectListUpdateHandler;
+
+ private List<Object> allHandlers = new ArrayList<>();
+
+ @Before
+ public void setUp() {
+ Gson eventGson = new EventGsonProvider().get();
+ gson = new JGroupsForwarderModule().buildJGroupsGson(eventGson);
+
+ indexChangeHandler = createHandlerMock(ForwardedIndexChangeHandler.class);
+ indexBatchChangeHandler = createHandlerMock(ForwardedIndexBatchChangeHandler.class);
+ indexAccountHandler = createHandlerMock(ForwardedIndexAccountHandler.class);
+ cacheEvictionHandler = createHandlerMock(ForwardedCacheEvictionHandler.class);
+ eventHandler = createHandlerMock(ForwardedEventHandler.class);
+ projectListUpdateHandler = createHandlerMock(ForwardedProjectListUpdateHandler.class);
+
+ processor =
+ new MessageProcessor(
+ gson,
+ indexChangeHandler,
+ indexBatchChangeHandler,
+ indexAccountHandler,
+ cacheEvictionHandler,
+ eventHandler,
+ projectListUpdateHandler);
+ }
+
+ private <T> T createHandlerMock(Class<T> handlerClass) {
+ T handlerMock = mock(handlerClass);
+ allHandlers.add(handlerMock);
+ return handlerMock;
+ }
+
+ @Test
+ public void indexAccount() throws IOException {
+ int ACCOUNT_ID = 100;
+
+ IndexAccount cmd = new IndexAccount(ACCOUNT_ID);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(indexAccountHandler, times(1))
+ .index(Account.id(ACCOUNT_ID), Operation.INDEX, Optional.empty());
+ verifyOtherHandlersNotUsed(indexAccountHandler);
+ }
+
+ @Test
+ public void indexChange() throws IOException {
+ String PROJECT = "foo";
+ int CHANGE_ID = 100;
+
+ IndexChange.Update cmd = new IndexChange.Update(PROJECT, CHANGE_ID);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(indexChangeHandler, times(1))
+ .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.INDEX, Optional.empty());
+ verifyOtherHandlersNotUsed(indexChangeHandler);
+ }
+
+ @Test
+ public void indexChangeBatchMode() throws IOException {
+ String PROJECT = "foo";
+ int CHANGE_ID = 100;
+
+ IndexChange.Update cmd = new IndexChange.Update(PROJECT, CHANGE_ID, true);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(indexBatchChangeHandler, times(1))
+ .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.INDEX, Optional.empty());
+ verifyOtherHandlersNotUsed(indexBatchChangeHandler);
+ }
+
+ @Test
+ public void deleteChangeFromIndex() throws IOException {
+ String PROJECT = "foo";
+ int CHANGE_ID = 100;
+
+ IndexChange.Delete cmd = new IndexChange.Delete(PROJECT, CHANGE_ID);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(indexChangeHandler, times(1))
+ .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.DELETE, Optional.empty());
+ verifyOtherHandlersNotUsed(indexChangeHandler);
+ }
+
+ @Test
+ public void evictCache() throws CacheNotFoundException {
+ String CACHE = "foo";
+ String KEY_JSON = gson.toJson(100);
+
+ EvictCache cmd = new EvictCache(CACHE, KEY_JSON);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ CacheEntry e = CacheEntry.from(CACHE, KEY_JSON);
+ verify(cacheEvictionHandler, times(1)).evict(e);
+ verifyOtherHandlersNotUsed(cacheEvictionHandler);
+ }
+
+ @Test
+ public void postEvent() throws PermissionBackendException {
+ String FOO = "foo";
+ int BAR = 100;
+
+ EventTypes.register(TestEvent.TYPE, TestEvent.class);
+ TestEvent event = new TestEvent(FOO, BAR);
+ PostEvent cmd = new PostEvent(event);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(1)).dispatch(captor.capture());
+ assertThat(captor.getValue()).isInstanceOf(TestEvent.class);
+ TestEvent v = (TestEvent) captor.getValue();
+ assertThat(v.foo).isEqualTo(FOO);
+ assertThat(v.bar).isEqualTo(BAR);
+ verifyOtherHandlersNotUsed(eventHandler);
+ }
+
+ @Test
+ public void addToProjectList() throws IOException {
+ String PROJECT = "foo";
+
+ AddToProjectList cmd = new AddToProjectList(PROJECT);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(projectListUpdateHandler, times(1)).update(PROJECT, false);
+ verifyOtherHandlersNotUsed(projectListUpdateHandler);
+ }
+
+ @Test
+ public void removeFromProjectList() throws IOException {
+ String PROJECT = "foo";
+
+ RemoveFromProjectList cmd = new RemoveFromProjectList(PROJECT);
+ assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true);
+ verify(projectListUpdateHandler, times(1)).update(PROJECT, true);
+ verifyOtherHandlersNotUsed(projectListUpdateHandler);
+ }
+
+ private void verifyOtherHandlersNotUsed(Object onlyUsedHandler) {
+ for (Object handler : allHandlers) {
+ if (handler != onlyUsedHandler) {
+ verifyNoInteractions(handler);
+ }
+ }
+ }
+
+ private static class TestEvent extends Event {
+ static final String TYPE = "test-event";
+
+ String foo;
+ int bar;
+
+ TestEvent(String foo, int bar) {
+ super(TYPE);
+ this.foo = foo;
+ this.bar = bar;
+ }
+ }
+}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProviderTest.java
index 5f66550..5391d42 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpClientProviderTest.java
@@ -22,6 +22,7 @@
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Scopes;
+import java.time.Duration;
import org.apache.http.impl.client.CloseableHttpClient;
import org.junit.Before;
import org.junit.Test;
@@ -32,7 +33,7 @@
@RunWith(MockitoJUnitRunner.class)
public class HttpClientProviderTest {
- private static final int TIME_INTERVAL = 1000;
+ private static final Duration TIME_INTERVAL = Duration.ofMillis(1000);
private static final String EMPTY = "";
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
index 1edcd43..4d874dd 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
@@ -30,6 +30,7 @@
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import com.google.gson.Gson;
import java.net.SocketTimeoutException;
+import java.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -38,8 +39,8 @@
public class HttpSessionTest {
private static final int MAX_TRIES = 3;
- private static final int RETRY_INTERVAL = 250;
- private static final int TIMEOUT = 500;
+ private static final Duration RETRY_INTERVAL = Duration.ofMillis(250);
+ private static final Duration TIMEOUT = Duration.ofMillis(500);
private static final int ERROR = 500;
private static final int NO_CONTENT = 204;
private static final int NOT_FOUND = 404;
@@ -141,24 +142,24 @@
.inScenario(RETRY_AT_DELAY)
.whenScenarioStateIs(Scenario.STARTED)
.willSetStateTo(REQUEST_MADE)
- .willReturn(aResponse().withFixedDelay(TIMEOUT)));
+ .willReturn(aResponse().withFixedDelay((int) TIMEOUT.toMillis())));
wireMockRule.givenThat(
post(urlEqualTo(ENDPOINT))
.inScenario(RETRY_AT_DELAY)
.whenScenarioStateIs(REQUEST_MADE)
.willSetStateTo(SECOND_TRY)
- .willReturn(aResponse().withFixedDelay(TIMEOUT)));
+ .willReturn(aResponse().withFixedDelay((int) TIMEOUT.toMillis())));
wireMockRule.givenThat(
post(urlEqualTo(ENDPOINT))
.inScenario(RETRY_AT_DELAY)
.whenScenarioStateIs(SECOND_TRY)
.willSetStateTo(THIRD_TRY)
- .willReturn(aResponse().withFixedDelay(TIMEOUT)));
+ .willReturn(aResponse().withFixedDelay((int) TIMEOUT.toMillis())));
wireMockRule.givenThat(
post(urlEqualTo(ENDPOINT))
.inScenario(RETRY_AT_DELAY)
.whenScenarioStateIs(THIRD_TRY)
- .willReturn(aResponse().withFixedDelay(TIMEOUT)));
+ .willReturn(aResponse().withFixedDelay((int) TIMEOUT.toMillis())));
httpSession.post(uri);
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 7908669..8a2745e 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -33,9 +33,11 @@
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.git.WorkQueue;
import com.google.gson.Gson;
import com.google.inject.Provider;
import java.io.IOException;
+import java.time.Duration;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -105,9 +107,13 @@
httpSessionMock = mock(HttpSession.class);
configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
when(configMock.http().maxTries()).thenReturn(3);
- when(configMock.http().retryInterval()).thenReturn(10);
+ when(configMock.http().retryInterval()).thenReturn(Duration.ofMillis(10));
+ when(configMock.http().threadPoolSize()).thenReturn(2);
peersMock = mock(Provider.class);
when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL)));
+ WorkQueue workQueue = mock(WorkQueue.class);
+ when(workQueue.createQueue(configMock.http().threadPoolSize(), "RestForwarderScheduler"))
+ .thenReturn(Executors.newScheduledThreadPool(2));
forwarder =
new RestForwarder(
httpSessionMock,
@@ -115,7 +121,7 @@
configMock,
peersMock,
gson, // TODO: Create provider
- new RestForwarderScheduler(Executors.newScheduledThreadPool(1)));
+ new FailsafeExecutorProvider(configMock).get());
}
@Test
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java
index 38f71e5..2cf3fad 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java
@@ -19,7 +19,7 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
import com.google.gerrit.entities.Change;
-import com.google.gerrit.server.CommentsUtil;
+import com.google.gerrit.server.DraftCommentsReader;
import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.notedb.ChangeNotes;
@@ -38,7 +38,7 @@
public class ChangeCheckerImplTest {
@Mock private GitRepositoryManager gitRepoMgr;
- @Mock private CommentsUtil commentsUtil;
+ @Mock private DraftCommentsReader draftCommentsReader;
@Mock private ChangeFinder changeFinder;
@Mock private OneOffRequestContext oneOffReqCtx;
@Mock private ChangeNotes testChangeNotes;
@@ -53,7 +53,8 @@
@Before
public void setUp() {
changeChecker =
- new ChangeCheckerImpl(gitRepoMgr, commentsUtil, changeFinder, oneOffReqCtx, changeId);
+ new ChangeCheckerImpl(
+ gitRepoMgr, draftCommentsReader, changeFinder, oneOffReqCtx, changeId);
}
@Test
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 7ed5f55..a8aa4a9 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -14,13 +14,10 @@
package com.ericsson.gerrit.plugins.highavailability.index;
-import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@@ -29,56 +26,27 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.DeleteChangeTask;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexTask;
-import com.ericsson.gerrit.plugins.highavailability.index.IndexEventLocks.VoidFunction;
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.entities.Change;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
-import java.util.Collection;
-import java.util.List;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-import java.util.function.Supplier;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-@Ignore
public class IndexEventHandlerTest {
- private static final String PLUGIN_NAME = "high-availability";
private static final String PROJECT_NAME = "test/project";
private static final int CHANGE_ID = 1;
private static final int ACCOUNT_ID = 2;
private static final String UUID = "3";
- private static final String OTHER_UUID = "4";
- private static final Integer INDEX_WAIT_TIMEOUT_MS = 5;
- private static final int MAX_TEST_PARALLELISM = 4;
- private static final String EXECUTOR_THREAD_NAME = "EXECUTOR_THREAD";
private IndexEventHandler indexEventHandler;
@Mock private Forwarder forwarder;
@@ -87,14 +55,8 @@
private Change.Id changeId;
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
- private ScheduledExecutorService executor = new CurrentThreadScheduledExecutorService();
- private ScheduledExecutorService batchExecutor = new CurrentThreadScheduledExecutorService();
- private ScheduledExecutorService testExecutor =
- Executors.newScheduledThreadPool(MAX_TEST_PARALLELISM);
@Mock private RequestContext mockCtx;
@Mock private Configuration configuration;
- private IndexEventLocks idLocks;
- private Thread executorThread;
private CurrentRequestContext currCtx =
new CurrentRequestContext(null, null, null) {
@@ -112,14 +74,6 @@
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
- Configuration.Index cfgIndex = mock(Configuration.Index.class);
- when(configuration.index()).thenReturn(cfgIndex);
- when(cfgIndex.numStripedLocks()).thenReturn(Configuration.DEFAULT_NUM_STRIPED_LOCKS);
-
- Configuration.Http http = mock(Configuration.Http.class);
- when(configuration.http()).thenReturn(http);
- when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
- when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
when(forwarder.indexAccount(eq(ACCOUNT_ID), any()))
.thenReturn(CompletableFuture.completedFuture(true));
when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any()))
@@ -128,38 +82,16 @@
when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any()))
.thenReturn(CompletableFuture.completedFuture(true));
- idLocks = new IndexEventLocks(configuration);
setUpIndexEventHandler(currCtx);
}
public void setUpIndexEventHandler(CurrentRequestContext currCtx) throws Exception {
- setUpIndexEventHandler(currCtx, idLocks, configuration);
- }
-
- public void setUpIndexEventHandler(CurrentRequestContext currCtx, IndexEventLocks idLocks)
- throws Exception {
- setUpIndexEventHandler(currCtx, idLocks, configuration);
- }
-
- public void setUpIndexEventHandler(
- CurrentRequestContext currCtx, IndexEventLocks idLocks, Configuration configuration)
- throws Exception {
- indexEventHandler =
- new IndexEventHandler(
- executor,
- batchExecutor,
- PLUGIN_NAME,
- forwarder,
- changeCheckerFactoryMock,
- currCtx,
- configuration,
- idLocks);
+ indexEventHandler = new IndexEventHandler(forwarder, changeCheckerFactoryMock, currCtx);
}
@Test
public void shouldIndexInRemoteOnChangeIndexedEvent() throws Exception {
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
verify(forwarder).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@@ -177,160 +109,6 @@
}
@Test
- public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
-
- verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
- }
-
- @Test
- public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onAccountIndexed(accountId.get());
-
- verify(forwarder, never()).indexAccount(eq(ACCOUNT_ID), any());
- }
-
- @Test
- public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onChangeDeleted(changeId.get());
-
- verify(forwarder, never()).deleteChangeFromIndex(eq(CHANGE_ID), any());
- }
-
- @Test
- public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onGroupIndexed(accountGroupUUID.get());
-
- verify(forwarder, never()).indexGroup(eq(UUID), any());
- }
-
- @Test
- public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onProjectIndexed(PROJECT_NAME);
-
- verify(forwarder, never()).indexProject(eq(PROJECT_NAME), any());
- }
-
- @Test
- public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false, true);
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
- verify(locks, times(2)).withLock(any(), any(), any());
- verify(forwarder, times(1)).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
- }
-
- @Test
- public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
-
- Configuration cfg = mock(Configuration.class);
- Configuration.Http httpCfg = mock(Configuration.Http.class);
- when(httpCfg.maxTries()).thenReturn(10);
- when(cfg.http()).thenReturn(httpCfg);
- setUpIndexEventHandler(currCtx, locks, cfg);
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
- verify(locks, times(11)).withLock(any(), any(), any());
- verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
- }
-
- @Test
- public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore semaphore = mock(Semaphore.class);
- when(locks.getSemaphore(anyString())).thenReturn(semaphore);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
-
- Configuration cfg = mock(Configuration.class);
- Configuration.Http httpCfg = mock(Configuration.Http.class);
- when(httpCfg.maxTries()).thenReturn(0);
- when(cfg.http()).thenReturn(httpCfg);
- setUpIndexEventHandler(currCtx, locks, cfg);
-
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
- verify(locks, times(1)).withLock(any(), any(), any());
- verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
- }
-
- @Test
- public void shouldLockPerIndexEventType() throws Exception {
- IndexEventLocks locks = mock(IndexEventLocks.class);
- Semaphore indexChangeLock = mock(Semaphore.class);
- when(indexChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(false);
- Semaphore accountChangeLock = mock(Semaphore.class);
- when(accountChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
- .thenReturn(true);
- when(locks.getSemaphore(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
- when(locks.getSemaphore(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
- Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- setUpIndexEventHandler(currCtx, locks);
-
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
- indexEventHandler.onAccountIndexed(accountId.get());
- executorThread.join();
- verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
- verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
- }
-
- @Test
public void shouldReindexInRemoteWhenContextIsMissingButForcedIndexingEnabled() throws Exception {
ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
@@ -341,21 +119,18 @@
setUpIndexEventHandler(new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- executorThread.join();
verify(forwarder).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldIndexInRemoteOnAccountIndexedEvent() throws Exception {
indexEventHandler.onAccountIndexed(accountId.get());
- executorThread.join();
verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
}
@Test
public void shouldDeleteFromIndexInRemoteOnChangeDeletedEvent() throws Exception {
indexEventHandler.onChangeDeleted(changeId.get());
- executorThread.join();
verify(forwarder).deleteChangeFromIndex(eq(CHANGE_ID), any());
verifyNoInteractions(changeCheckerMock); // Deleted changes should not be checked against NoteDb
}
@@ -363,7 +138,6 @@
@Test
public void shouldIndexInRemoteOnGroupIndexedEvent() throws Exception {
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
- executorThread.join();
verify(forwarder).indexGroup(eq(UUID), any());
}
@@ -393,396 +167,4 @@
Context.unsetForwardedEvent();
verifyNoInteractions(forwarder);
}
-
- @Test
- public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
- ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler =
- new IndexEventHandler(
- poolMock,
- poolBatchMock,
- PLUGIN_NAME,
- forwarder,
- changeCheckerFactoryMock,
- currCtx,
- configuration,
- idLocks);
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
- verify(poolMock, times(1))
- .execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null));
- }
-
- @Test
- public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
- ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler =
- new IndexEventHandler(
- poolMock,
- poolBatchMock,
- PLUGIN_NAME,
- forwarder,
- changeCheckerFactoryMock,
- currCtx,
- configuration,
- idLocks);
- indexEventHandler.onAccountIndexed(accountId.get());
- indexEventHandler.onAccountIndexed(accountId.get());
- verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
- }
-
- @Test
- public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
- ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler =
- new IndexEventHandler(
- poolMock,
- poolBatchMock,
- PLUGIN_NAME,
- forwarder,
- changeCheckerFactoryMock,
- currCtx,
- configuration,
- idLocks);
- indexEventHandler.onGroupIndexed(accountGroupUUID.get());
- indexEventHandler.onGroupIndexed(accountGroupUUID.get());
- verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
- }
-
- @Test
- public void testIndexChangeTaskToString() throws Exception {
- IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
- assertThat(task.toString())
- .isEqualTo(
- String.format("[%s] Index change %s in target instance", PLUGIN_NAME, CHANGE_ID));
- }
-
- @Test
- public void testIndexAccountTaskToString() throws Exception {
- IndexAccountTask task = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
- assertThat(task.toString())
- .isEqualTo(
- String.format("[%s] Index account %s in target instance", PLUGIN_NAME, ACCOUNT_ID));
- }
-
- @Test
- public void testIndexGroupTaskToString() throws Exception {
- IndexGroupTask task = indexEventHandler.new IndexGroupTask(UUID);
- assertThat(task.toString())
- .isEqualTo(String.format("[%s] Index group %s in target instance", PLUGIN_NAME, UUID));
- }
-
- @Test
- public void testIndexChangeTaskHashCodeAndEquals() {
- IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
-
- IndexChangeTask sameTask = task;
- assertThat(task.equals(sameTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
-
- IndexChangeTask identicalTask =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
- assertThat(task.equals(identicalTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
-
- assertThat(task.equals(null)).isFalse();
- assertThat(
- task.equals(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, null)))
- .isFalse();
- assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
-
- IndexChangeTask differentChangeIdTask =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, null);
- assertThat(task.equals(differentChangeIdTask)).isFalse();
- assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
- }
-
- @Test
- public void testDeleteChangeTaskHashCodeAndEquals() {
- DeleteChangeTask task = indexEventHandler.new DeleteChangeTask(CHANGE_ID, null);
-
- DeleteChangeTask sameTask = task;
- assertThat(task.equals(sameTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
-
- DeleteChangeTask identicalTask = indexEventHandler.new DeleteChangeTask(CHANGE_ID, null);
- assertThat(task.equals(identicalTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
-
- assertThat(task.equals(null)).isFalse();
- assertThat(task.equals(indexEventHandler.new DeleteChangeTask(CHANGE_ID + 1, null))).isFalse();
- assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
-
- DeleteChangeTask differentChangeIdTask = indexEventHandler.new DeleteChangeTask(123, null);
- assertThat(task.equals(differentChangeIdTask)).isFalse();
- assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
- }
-
- @Test
- public void testIndexAccountTaskHashCodeAndEquals() {
- IndexAccountTask task = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
-
- IndexAccountTask sameTask = task;
- assertThat(task.equals(sameTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
-
- IndexAccountTask identicalTask = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
- assertThat(task.equals(identicalTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
-
- assertThat(task.equals(null)).isFalse();
- assertThat(task.equals(indexEventHandler.new IndexAccountTask(ACCOUNT_ID + 1))).isFalse();
- assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
-
- IndexAccountTask differentAccountIdTask = indexEventHandler.new IndexAccountTask(123);
- assertThat(task.equals(differentAccountIdTask)).isFalse();
- assertThat(task.hashCode()).isNotEqualTo(differentAccountIdTask.hashCode());
- }
-
- @Test
- public void testIndexGroupTaskHashCodeAndEquals() {
- IndexGroupTask task = indexEventHandler.new IndexGroupTask(UUID);
-
- IndexGroupTask sameTask = task;
- assertThat(task.equals(sameTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
-
- IndexGroupTask identicalTask = indexEventHandler.new IndexGroupTask(UUID);
- assertThat(task.equals(identicalTask)).isTrue();
- assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
-
- assertThat(task.equals(null)).isFalse();
- assertThat(task.equals(indexEventHandler.new IndexGroupTask(OTHER_UUID))).isFalse();
- assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
-
- IndexGroupTask differentGroupIdTask = indexEventHandler.new IndexGroupTask("123");
- assertThat(task.equals(differentGroupIdTask)).isFalse();
- assertThat(task.hashCode()).isNotEqualTo(differentGroupIdTask.hashCode());
- }
-
- class TestTask<T> implements Runnable {
- private IndexTask task;
- private CyclicBarrier testBarrier;
- private Supplier<T> successFunc;
- private VoidFunction failureFunc;
- private CompletableFuture<T> future;
-
- public TestTask(
- IndexTask task,
- CyclicBarrier testBarrier,
- Supplier<T> successFunc,
- VoidFunction failureFunc) {
- this.task = task;
- this.testBarrier = testBarrier;
- this.successFunc = successFunc;
- this.failureFunc = failureFunc;
- this.future = new CompletableFuture<>();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void run() {
- try {
- testBarrier.await();
- idLocks
- .withLock(
- task,
- () ->
- runLater(
- INDEX_WAIT_TIMEOUT_MS * 2,
- () -> CompletableFuture.completedFuture(successFunc.get())),
- failureFunc)
- .whenComplete(
- (v, t) -> {
- if (t == null) {
- future.complete((T) v);
- } else {
- future.completeExceptionally(t);
- }
- });
- } catch (Throwable t) {
- future = new CompletableFuture<>();
- future.completeExceptionally(t);
- }
- }
-
- public void join() {
- try {
- future.join();
- } catch (Exception e) {
- }
- }
-
- private CompletableFuture<T> runLater(
- long scheduledTimeMsec, Supplier<CompletableFuture<T>> supplier) {
- CompletableFuture<T> resFuture = new CompletableFuture<>();
- testExecutor.schedule(
- () -> {
- try {
- return supplier
- .get()
- .whenComplete(
- (v, t) -> {
- if (t == null) {
- resFuture.complete(v);
- }
- resFuture.completeExceptionally(t);
- });
- } catch (Throwable t) {
- return resFuture.completeExceptionally(t);
- }
- },
- scheduledTimeMsec,
- TimeUnit.MILLISECONDS);
- return resFuture;
- }
- }
-
- @Test
- public void indexLocksShouldBlockConcurrentIndexChange() throws Exception {
- IndexChangeTask indexTask1 =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
- IndexChangeTask indexTask2 =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
- testIsolationOfCuncurrentIndexTasks(indexTask1, indexTask2);
- }
-
- @Test
- public void indexLocksShouldBlockConcurrentIndexAndDeleteChange() throws Exception {
- IndexChangeTask indexTask =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
- DeleteChangeTask deleteTask =
- indexEventHandler.new DeleteChangeTask(CHANGE_ID, new IndexEvent());
- testIsolationOfCuncurrentIndexTasks(indexTask, deleteTask);
- }
-
- private void testIsolationOfCuncurrentIndexTasks(IndexTask indexTask1, IndexTask indexTask2)
- throws Exception {
- AtomicInteger changeIndexedCount = new AtomicInteger();
- AtomicInteger lockFailedCounts = new AtomicInteger();
- CyclicBarrier changeThreadsSync = new CyclicBarrier(2);
-
- TestTask<Integer> task1 =
- new TestTask<>(
- indexTask1,
- changeThreadsSync,
- () -> changeIndexedCount.incrementAndGet(),
- () -> lockFailedCounts.incrementAndGet());
- TestTask<Integer> task2 =
- new TestTask<>(
- indexTask2,
- changeThreadsSync,
- () -> changeIndexedCount.incrementAndGet(),
- () -> lockFailedCounts.incrementAndGet());
-
- new Thread(task1).start();
- new Thread(task2).start();
- task1.join();
- task2.join();
-
- /* Both assertions needs to be true, the order doesn't really matter:
- * - Only one of the two tasks should succeed
- * - Only one of the two tasks should fail to acquire the lock
- */
- assertThat(changeIndexedCount.get()).isEqualTo(1);
- assertThat(lockFailedCounts.get()).isEqualTo(1);
- }
-
- private class CurrentThreadScheduledExecutorService implements ScheduledExecutorService {
- @Override
- public void shutdown() {}
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return null;
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return null;
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return null;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- return null;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(
- Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException {
- return null;
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- return null;
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return null;
- }
-
- @Override
- public void execute(Runnable command) {
- executorThread = new Thread(command);
- executorThread.setName(EXECUTOR_THREAD_NAME);
- executorThread.start();
- }
-
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- command.run();
- return null;
- }
-
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(
- Runnable command, long initialDelay, long period, TimeUnit unit) {
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(
- Runnable command, long initialDelay, long delay, TimeUnit unit) {
- return null;
- }
- }
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProviderTest.java
deleted file mode 100644
index abab0b9..0000000
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexExecutorProviderTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright (C) 2015 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.ericsson.gerrit.plugins.highavailability.index;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
-import com.google.gerrit.server.git.WorkQueue;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Answers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class IndexExecutorProviderTest {
- @Mock private ScheduledThreadPoolExecutor executorMock;
- private IndexExecutorProvider indexExecutorProvider;
-
- @Before
- public void setUp() throws Exception {
- executorMock = mock(ScheduledThreadPoolExecutor.class);
- WorkQueue workQueueMock = mock(WorkQueue.class);
- when(workQueueMock.createQueue(4, "Forward-Index-Event")).thenReturn(executorMock);
- Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
- when(configMock.index().threadPoolSize()).thenReturn(4);
- indexExecutorProvider = new IndexExecutorProvider(workQueueMock, configMock);
- }
-
- @Test
- public void shouldReturnExecutor() throws Exception {
- assertThat(indexExecutorProvider.get()).isEqualTo(executorMock);
- }
-
- @Test
- public void testStop() throws Exception {
- indexExecutorProvider.start();
- assertThat(indexExecutorProvider.get()).isEqualTo(executorMock);
- indexExecutorProvider.stop();
- verify(executorMock).shutdown();
- assertThat(indexExecutorProvider.get()).isNull();
- }
-}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
index 0dd27e1..fc8c29a 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
@@ -55,9 +55,9 @@
private Configuration pluginConfigurationMock;
@Mock private InetAddressFinder finder;
- private JGroupsPeerInfoProvider jGroupsPeerInfoProvider;
- private JGroupsPeerInfoProvider jGroupsPeerInfoProvider2;
- @Mock private MyUrlProvider myUrlProviderTest;
+ private JGroupsPeerInfoProvider firstJGroupsPeerInfoProvider;
+ private JGroupsPeerInfoProvider secondJGroupsPeerInfoProvider;
+ @Mock private MyUrlProvider myUrlProvider;
@Rule public WireMockRule kubeApiMock = new WireMockRule(options().port(48443));
@@ -93,12 +93,15 @@
} else {
when(finder.findAddress()).thenReturn(Optional.of(Inet4Address.getByName("127.0.0.1")));
}
- jGroupsPeerInfoProvider =
+ JChannelProvider channelProvider = new JChannelProvider(pluginConfigurationMock);
+ firstJGroupsPeerInfoProvider =
Mockito.spy(
- new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest));
- jGroupsPeerInfoProvider2 =
+ new JGroupsPeerInfoProvider(
+ pluginConfigurationMock, finder, myUrlProvider, channelProvider.get()));
+ secondJGroupsPeerInfoProvider =
Mockito.spy(
- new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest));
+ new JGroupsPeerInfoProvider(
+ pluginConfigurationMock, finder, myUrlProvider, channelProvider.get()));
StringBuilder kubeApiUrlBuilder = new StringBuilder();
kubeApiUrlBuilder.append("/api/v1/namespaces/");
@@ -116,16 +119,16 @@
aResponse()
.withJsonBody(new ObjectMapper().readTree(respJson))
.withStatus(HttpStatus.SC_OK)));
- jGroupsPeerInfoProvider.connect();
+ firstJGroupsPeerInfoProvider.connect();
verify(getRequestedFor(urlEqualTo(kubeApiUrl)));
- jGroupsPeerInfoProvider2.connect();
+ secondJGroupsPeerInfoProvider.connect();
- verify(jGroupsPeerInfoProvider, timeout(10000)).receive(any(Message.class));
+ verify(firstJGroupsPeerInfoProvider, timeout(10000)).receive(any(Message.class));
- assertThat(jGroupsPeerInfoProvider.get().isEmpty()).isFalse();
- assertThat(jGroupsPeerInfoProvider.get().size()).isEqualTo(1);
+ assertThat(firstJGroupsPeerInfoProvider.get()).isNotEmpty();
+ assertThat(firstJGroupsPeerInfoProvider.get()).hasSize(1);
- assertThat(jGroupsPeerInfoProvider2.get().isEmpty()).isFalse();
- assertThat(jGroupsPeerInfoProvider2.get().size()).isEqualTo(1);
+ assertThat(secondJGroupsPeerInfoProvider.get()).isNotEmpty();
+ assertThat(secondJGroupsPeerInfoProvider.get()).hasSize(1);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
index 1110fc3..fd203e0 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
@@ -54,8 +54,9 @@
@Before
public void setUp() throws Exception {
finder = new InetAddressFinder(pluginConfigurationMock);
+ JChannel channel = new JChannelProvider(pluginConfigurationMock).get();
jGroupsPeerInfoProvider =
- new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest);
+ new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest, channel);
peerInfo = Optional.of(new PeerInfo("test message"));
channel.setName("testChannel");
}
@@ -95,14 +96,6 @@
jGroupsPeerInfoProvider.viewAccepted(view);
}
- @Test(expected = NullPointerException.class)
- public void testViewAcceptedWithExceptionThrown() throws Exception {
- when(view.getMembers()).thenReturn(members);
- when(view.size()).thenReturn(2);
- when(members.size()).thenReturn(2);
- jGroupsPeerInfoProvider.viewAccepted(view);
- }
-
@Test
public void testViewAcceptedWhenPeerAddressIsNotNullAndIsNotMemberOfView() {
when(view.getMembers()).thenReturn(members);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/MyUrlProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/MyUrlProviderTest.java
index 1a27de0..f60d52a 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/MyUrlProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/MyUrlProviderTest.java
@@ -23,13 +23,13 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.google.gerrit.common.Nullable;
import com.google.inject.ProvisionException;
+import java.util.List;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import wiremock.com.google.common.collect.Lists;
@RunWith(MockitoJUnitRunner.class)
public class MyUrlProviderTest {
@@ -84,7 +84,7 @@
@Test
public void testGetJGroupsMyUrlFromListenUrlWhenMultipleListenUrlsSpecified() throws Exception {
- gerritServerConfig.setStringList(HTTPD, null, LISTEN_URL, Lists.newArrayList("a", "b"));
+ gerritServerConfig.setStringList(HTTPD, null, LISTEN_URL, List.of("a", "b"));
ProvisionException thrown = assertThrows(ProvisionException.class, () -> getMyUrlProvider());
assertThat(thrown).hasMessageThat().contains("exactly 1 value configured; found 2");
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleanerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleanerTest.java
index 0a8fc92..19c9a51 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleanerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/websession/file/FileBasedWebSessionCacheCleanerTest.java
@@ -28,6 +28,7 @@
import com.ericsson.gerrit.plugins.highavailability.websession.file.FileBasedWebSessionCacheCleaner.CleanupTask;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Provider;
+import java.time.Duration;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -41,7 +42,7 @@
@RunWith(MockitoJUnitRunner.class)
public class FileBasedWebSessionCacheCleanerTest {
- private static long CLEANUP_INTERVAL = 5000;
+ private static Duration CLEANUP_INTERVAL = Duration.ofSeconds(5);
private static String SOME_PLUGIN_NAME = "somePluginName";
@Mock private ScheduledThreadPoolExecutor executorMock;
@@ -89,7 +90,10 @@
cleaner.start();
verify(executorMock, times(1))
.scheduleAtFixedRate(
- isA(CleanupTask.class), eq(1000l), eq(CLEANUP_INTERVAL), eq(TimeUnit.MILLISECONDS));
+ isA(CleanupTask.class),
+ eq(1000l),
+ eq(CLEANUP_INTERVAL.toMillis()),
+ eq(TimeUnit.MILLISECONDS));
}
@Test