Merge branch 'stable-2.15' into stable-2.16
* stable-2.15:
Upgrade bazlets to latest stable-2.15 to build with 2.15.9 API
Upgrade bazlets to latest stable-2.15 to build with 2.15.8 API
Upgrade bazlets to latest stable-2.14 to build with 2.14.18 API
ForwardedIndexChangeHandler: Replace parameter writing with variable
PeerInfoNotAvailableException: Remove this class as no longer used
Forwarder tests: Convert useless members to local variable
ForwardedIndexChangeHandlerTest: Remove unused gitRepoMgrMock
ChangeChecker{Impl}: Remove redundant public modifier
ForwardedIndexChangeHandler: Remove redundant local variable
ChangeReindexRunnable: Replace statement lambda with expression
IndexTs: Move local variable to used scope
ForwardedIndexChangeHandler: Remove unused changeFinder parameter
Catch all exceptions when indexTs fails
Minimize use of ReviewDb when not needed
Retry change reindex because of NFS access caching
RestForwarder: Replace lambda with method reference
Always use the stored timestamp when checking for updates
Use always the last TS of the reindex across runs
GroupReindexRunnable: Replace lambdas with method reference
Support n nodes when using static strategy
Change-Id: Ifcc5dd45444df869026434323c31f139bcd56e51
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index 8f1963a..edd96d7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -20,6 +20,7 @@
import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.server.config.ConfigUtil;
@@ -32,8 +33,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +51,8 @@
// common parameters to cache and index sections
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
+ static final int DEFAULT_INDEX_MAX_TRIES = 2;
+ static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
static final String NUM_STRIPED_LOCKS = "numStripedLocks";
static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
@@ -153,11 +159,6 @@
}
}
- @Nullable
- private static String trimTrailingSlash(@Nullable String in) {
- return in == null ? null : CharMatcher.is('/').trimTrailingFrom(in);
- }
-
public static class Main {
static final String MAIN_SECTION = "main";
static final String SHARED_DIRECTORY_KEY = "sharedDirectory";
@@ -247,17 +248,20 @@
static final String STATIC_SUBSECTION = PeerInfoStrategy.STATIC.name().toLowerCase();
static final String URL_KEY = "url";
- private final String url;
+ private final Set<String> urls;
private PeerInfoStatic(Config cfg) {
- url =
- trimTrailingSlash(
- Strings.nullToEmpty(cfg.getString(PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY)));
- log.debug("Url: {}", url);
+ urls =
+ Arrays.stream(cfg.getStringList(PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY))
+ .filter(Objects::nonNull)
+ .filter(s -> !s.isEmpty())
+ .map(s -> CharMatcher.is('/').trimTrailingFrom(s))
+ .collect(Collectors.toSet());
+ log.debug("Urls: {}", urls);
}
- public String url() {
- return url;
+ public Set<String> urls() {
+ return ImmutableSet.copyOf(urls);
}
}
@@ -275,6 +279,11 @@
public String myUrl() {
return myUrl;
}
+
+ @Nullable
+ private static String trimTrailingSlash(@Nullable String in) {
+ return in == null ? in : CharMatcher.is('/').trimTrailingFrom(in);
+ }
}
public static class JGroups {
@@ -439,8 +448,12 @@
public static class Index extends Forwarding {
static final String INDEX_SECTION = "index";
+ static final String MAX_TRIES_KEY = "maxTries";
+ static final String RETRY_INTERVAL_KEY = "retryInterval";
private final int threadPoolSize;
+ private final int retryInterval;
+ private final int maxTries;
private final int numStripedLocks;
@@ -448,6 +461,8 @@
super(cfg, INDEX_SECTION);
threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+ retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
+ maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
}
public int threadPoolSize() {
@@ -457,6 +472,14 @@
public int numStripedLocks() {
return numStripedLocks;
}
+
+ public int retryInterval() {
+ return retryInterval;
+ }
+
+ public int maxTries() {
+ return maxTries;
+ }
}
public static class Websession extends Forwarding {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
index 597034d..3d3212b 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
@@ -17,11 +17,11 @@
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Provider;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
-public abstract class ExecutorProvider implements Provider<Executor>, LifecycleListener {
- private ExecutorService executor;
+public abstract class ExecutorProvider
+ implements Provider<ScheduledExecutorService>, LifecycleListener {
+ private ScheduledExecutorService executor;
protected ExecutorProvider(WorkQueue workQueue, int threadPoolSize, String threadNamePrefix) {
executor = workQueue.createQueue(threadPoolSize, threadNamePrefix);
@@ -39,7 +39,7 @@
}
@Override
- public Executor get() {
+ public ScheduledExecutorService get() {
return executor;
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
index e58f62f..86c757e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
@@ -164,7 +164,8 @@
PeerInfoStrategy.JGROUPS, EnumSet.allOf(PeerInfoStrategy.class), "Peer info strategy");
config.setEnum(PEER_INFO_SECTION, null, STRATEGY_KEY, strategy);
if (strategy == PeerInfoStrategy.STATIC) {
- promptAndSetString("Peer URL", PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY, null);
+ promptAndSetString(
+ titleWithNote("Peer URL", "urls"), PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY, null);
} else {
promptAndSetString(
"JGroups cluster name",
@@ -291,7 +292,11 @@
}
private static String titleForOptionalWithNote(String prefix, String suffix) {
- return prefix + " (optional); manually repeat this line to configure more " + suffix;
+ return titleWithNote(prefix + " (optional)", suffix);
+ }
+
+ private static String titleWithNote(String prefix, String suffix) {
+ return prefix + "; manually repeat this line to configure more " + suffix;
}
private static String numberToString(int number) {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
index 62a9792..1f5b56e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
@@ -86,10 +86,7 @@
Stream<Change> projectChangesStream =
notesFactory
.scan(repo, db, projectName)
- .map(
- (ChangeNotesResult changeNotes) -> {
- return changeNotes.notes().getChange();
- });
+ .map((ChangeNotesResult changeNotes) -> changeNotes.notes().getChange());
allChangesStream = Streams.concat(allChangesStream, projectChangesStream);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
index 40b4117..6b13bb1 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
@@ -24,7 +24,6 @@
import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.notedb.ChangeNotes;
-import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -34,8 +33,6 @@
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
@@ -58,7 +55,6 @@
private volatile LocalDateTime groupTs;
class FlusherRunner implements Runnable {
- private Map<AbstractIndexRestApiServlet.IndexName, LocalDateTime> storedTs = new HashMap<>();
@Override
public void run() {
@@ -68,12 +64,11 @@
}
private void store(AbstractIndexRestApiServlet.IndexName index, LocalDateTime latestTs) {
- LocalDateTime currTs = storedTs.get(index);
- if (currTs == null || latestTs.isAfter(currTs)) {
+ Optional<LocalDateTime> currTs = getUpdateTs(index);
+ if (!currTs.isPresent() || latestTs.isAfter(currTs.get())) {
Path indexTsFile = dataDir.resolve(index.name().toLowerCase());
try {
Files.write(indexTsFile, latestTs.format(formatter).getBytes(StandardCharsets.UTF_8));
- storedTs.put(index, currTs);
} catch (IOException e) {
log.error("Unable to update last timestamp for index " + index, e);
}
@@ -113,7 +108,7 @@
changeNotes == null
? LocalDateTime.now()
: changeNotes.getChange().getLastUpdatedOn().toLocalDateTime());
- } catch (OrmException e) {
+ } catch (Exception e) {
log.warn("Unable to update the latest TS for change {}", e);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ReindexRunnable.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ReindexRunnable.java
index 7891bbf..df3a0fd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ReindexRunnable.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ReindexRunnable.java
@@ -33,6 +33,7 @@
private final AbstractIndexRestApiServlet.IndexName itemName;
private final OneOffRequestContext ctx;
private final IndexTs indexTs;
+ private Timestamp newLastIndexTs;
@Inject
public ReindexRunnable(
@@ -47,9 +48,8 @@
Optional<LocalDateTime> maybeIndexTs = indexTs.getUpdateTs(itemName);
String itemNameString = itemName.name().toLowerCase();
if (maybeIndexTs.isPresent()) {
- Timestamp lastIndexTs = Timestamp.valueOf(maybeIndexTs.get());
- Timestamp newLastIndexTs = lastIndexTs;
- log.debug("Scanning for all the {}s after {}", itemNameString, lastIndexTs);
+ newLastIndexTs = maxTimestamp(newLastIndexTs, Timestamp.valueOf(maybeIndexTs.get()));
+ log.debug("Scanning for all the {}s after {}", itemNameString, newLastIndexTs);
try (ManualRequestContext mctx = ctx.open();
ReviewDb db = mctx.getReviewDbProvider().get()) {
int count = 0;
@@ -57,12 +57,10 @@
Stopwatch stopwatch = Stopwatch.createStarted();
for (T c : fetchItems(db)) {
try {
- Optional<Timestamp> itemTs = indexIfNeeded(db, c, lastIndexTs);
+ Optional<Timestamp> itemTs = indexIfNeeded(db, c, newLastIndexTs);
if (itemTs.isPresent()) {
count++;
- if (itemTs.get().after(newLastIndexTs)) {
- newLastIndexTs = itemTs.get();
- }
+ newLastIndexTs = maxTimestamp(newLastIndexTs, itemTs.get());
}
} catch (Exception e) {
log.error("Unable to reindex {} {}", itemNameString, c, e);
@@ -90,6 +88,21 @@
}
}
+ private Timestamp maxTimestamp(Timestamp ts1, Timestamp ts2) {
+ if (ts1 == null) {
+ return ts2;
+ }
+
+ if (ts2 == null) {
+ return ts1;
+ }
+
+ if (ts1.after(ts2)) {
+ return ts1;
+ }
+ return ts2;
+ }
+
protected abstract Iterable<T> fetchItems(ReviewDb db) throws Exception;
protected abstract Optional<Timestamp> indexIfNeeded(ReviewDb db, T item, Timestamp sinceTs);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
index eb738d9..d34959c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -15,19 +15,26 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.Configuration.Index;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeDb;
+import com.ericsson.gerrit.plugins.highavailability.index.ForwardedIndexExecutor;
import com.google.common.base.Splitter;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.index.change.ChangeIndexer;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.project.NoSuchChangeException;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gwtorm.server.OrmException;
-import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Index a change using {@link ChangeIndexer}. This class is meant to be used on the receiving side
@@ -38,44 +45,114 @@
@Singleton
public class ForwardedIndexChangeHandler extends ForwardedIndexingHandler<String> {
private final ChangeIndexer indexer;
- private final SchemaFactory<ReviewDb> schemaFactory;
- private final ChangeFinder changeFinder;
+ private final ChangeDb changeDb;
+ private final ScheduledExecutorService indexExecutor;
+ private final OneOffRequestContext oneOffCtx;
+ private final int retryInterval;
+ private final int maxTries;
+ private final ChangeCheckerImpl.Factory changeCheckerFactory;
@Inject
ForwardedIndexChangeHandler(
ChangeIndexer indexer,
- SchemaFactory<ReviewDb> schemaFactory,
- ChangeFinder changeFinder,
- Configuration config) {
+ ChangeDb changeDb,
+ Configuration config,
+ @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
+ OneOffRequestContext oneOffCtx,
+ ChangeCheckerImpl.Factory changeCheckerFactory) {
super(config.index());
this.indexer = indexer;
- this.schemaFactory = schemaFactory;
- this.changeFinder = changeFinder;
+ this.changeDb = changeDb;
+ this.indexExecutor = indexExecutor;
+ this.oneOffCtx = oneOffCtx;
+ this.changeCheckerFactory = changeCheckerFactory;
+
+ Index indexConfig = config.index();
+ this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
+ this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
}
@Override
protected void doIndex(String id, Optional<IndexEvent> indexEvent)
throws IOException, OrmException {
- ChangeNotes change = null;
- try (ReviewDb db = schemaFactory.open()) {
- change = changeFinder.findOne(id);
- if (change != null) {
- change.reload();
- indexer.index(db, change.getChange());
- log.debug("Change {} successfully indexed", id);
+ doIndex(id, indexEvent, 0);
+ }
+
+ private void doIndex(String id, Optional<IndexEvent> indexEvent, int retryCount)
+ throws IOException, OrmException {
+ try {
+ ChangeChecker checker = changeCheckerFactory.create(id);
+ Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+ if (changeNotes.isPresent()) {
+ ChangeNotes notes = changeNotes.get();
+ reindex(notes);
+
+ if (checker.isChangeUpToDate(indexEvent)) {
+ if (retryCount > 0) {
+ log.warn("Change {} has been eventually indexed after {} attempt(s)", id, retryCount);
+ } else {
+ log.debug("Change {} successfully indexed", id);
+ }
+ } else {
+ log.warn(
+ "Change {} seems too old compared to the event timestamp (event-Ts={} >> change-Ts={})",
+ id,
+ indexEvent,
+ checker);
+ rescheduleIndex(id, indexEvent, retryCount + 1);
+ }
+ } else {
+ indexer.delete(parseChangeId(id));
+ log.warn(
+ "Change {} could not be found in the local Git repository (eventTs={}), deleted from index",
+ id,
+ indexEvent);
}
} catch (Exception e) {
- if (!isCausedByNoSuchChangeException(e)) {
- throw e;
+ if (isCausedByNoSuchChangeException(e)) {
+ indexer.delete(parseChangeId(id));
+ log.warn("Error trying to index Change {}. Deleted from index", id, e);
+ return;
}
- log.debug("Change {} was deleted, aborting forwarded indexing the change.", id);
+
+ throw e;
}
- if (change == null) {
- indexer.delete(parseChangeId(id));
- log.debug("Change {} not found, deleted from index", id);
+ }
+
+ private void reindex(ChangeNotes notes) throws IOException, OrmException {
+ try (ReviewDb db = changeDb.open()) {
+ notes.reload();
+ indexer.index(db, notes.getChange());
}
}
+ private void rescheduleIndex(String id, Optional<IndexEvent> indexEvent, int retryCount) {
+ if (retryCount > maxTries) {
+ log.error(
+ "Change {} could not be indexed after {} retries. Change index could be stale.",
+ id,
+ retryCount);
+ return;
+ }
+
+ log.warn(
+ "Retrying for the #{} time to index Change {} after {} msecs",
+ retryCount,
+ id,
+ retryInterval);
+ indexExecutor.schedule(
+ () -> {
+ try (ManualRequestContext ctx = oneOffCtx.open()) {
+ Context.setForwardedEvent(true);
+ doIndex(id, indexEvent, retryCount);
+ } catch (Exception e) {
+ log.warn("Change {} could not be indexed", id, e);
+ }
+ },
+ retryInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
@Override
protected void doDelete(String id, Optional<IndexEvent> indexEvent) throws IOException {
indexer.delete(parseChangeId(id));
@@ -83,16 +160,16 @@
}
private static Change.Id parseChangeId(String id) {
- Change.Id changeId = new Change.Id(Integer.parseInt(Splitter.on("~").splitToList(id).get(1)));
- return changeId;
+ return new Change.Id(Integer.parseInt(Splitter.on("~").splitToList(id).get(1)));
}
private static boolean isCausedByNoSuchChangeException(Throwable throwable) {
- while (throwable != null) {
- if (throwable instanceof NoSuchChangeException) {
+ Throwable cause = throwable;
+ while (cause != null) {
+ if (cause instanceof NoSuchChangeException) {
return true;
}
- throwable = throwable.getCause();
+ cause = cause.getCause();
}
return false;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
index f6f24a7..037c1c6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
@@ -20,11 +20,15 @@
public class IndexEvent {
public long eventCreatedOn = System.currentTimeMillis() / 1000;
+ public String targetSha;
@Override
public String toString() {
- return "IndexEvent@"
- + LocalDateTime.ofEpochSecond(eventCreatedOn, 0, ZoneOffset.UTC)
- .format(DateTimeFormatter.ISO_DATE_TIME);
+ return "IndexEvent@" + format(eventCreatedOn) + ((targetSha != null) ? "/" + targetSha : "");
+ }
+
+ public static String format(long eventTs) {
+ return LocalDateTime.ofEpochSecond(eventTs, 0, ZoneOffset.UTC)
+ .format(DateTimeFormatter.ISO_DATE_TIME);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java
index 253dca7..f2ac080 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java
@@ -15,18 +15,15 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
-import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.base.Supplier;
import com.google.common.net.MediaType;
import com.google.gerrit.server.events.SupplierSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Inject;
-import com.google.inject.Provider;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
-import java.util.Optional;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpPost;
@@ -35,40 +32,30 @@
class HttpSession {
private final CloseableHttpClient httpClient;
- private final Provider<Optional<PeerInfo>> peerInfo;
private final Gson gson =
new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
@Inject
- HttpSession(CloseableHttpClient httpClient, Provider<Optional<PeerInfo>> peerInfo) {
+ HttpSession(CloseableHttpClient httpClient) {
this.httpClient = httpClient;
- this.peerInfo = peerInfo;
}
- HttpResult post(String endpoint) throws IOException {
- return post(endpoint, null);
+ HttpResult post(String uri) throws IOException {
+ return post(uri, null);
}
- HttpResult post(String endpoint, Object content) throws IOException {
- HttpPost post = new HttpPost(getPeerInfo().getDirectUrl() + endpoint);
+ HttpResult post(String uri, Object content) throws IOException {
+ HttpPost post = new HttpPost(uri);
setContent(post, content);
return httpClient.execute(post, new HttpResponseHandler());
}
- HttpResult delete(String endpoint) throws IOException {
- return delete(endpoint, null);
+ HttpResult delete(String uri) throws IOException {
+ return delete(uri, null);
}
- private PeerInfo getPeerInfo() throws PeerInfoNotAvailableException {
- PeerInfo info = peerInfo.get().orElse(null);
- if (info == null) {
- throw new PeerInfoNotAvailableException();
- }
- return info;
- }
-
- HttpResult delete(String endpoint, Object content) throws IOException {
- HttpDeleteWithBody delete = new HttpDeleteWithBody(getPeerInfo().getDirectUrl() + endpoint);
+ HttpResult delete(String uri, Object content) throws IOException {
+ HttpDeleteWithBody delete = new HttpDeleteWithBody(uri);
setContent(delete, content);
return httpClient.execute(delete, new HttpResponseHandler());
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index 1e9e931..ab7b81a 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -19,12 +19,18 @@
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
+import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.base.Joiner;
import com.google.gerrit.extensions.annotations.PluginName;
import com.google.gerrit.extensions.restapi.Url;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.apache.http.HttpException;
import org.apache.http.client.ClientProtocolException;
@@ -32,59 +38,54 @@
import org.slf4j.LoggerFactory;
class RestForwarder implements Forwarder {
+ enum RequestMethod {
+ POST,
+ DELETE
+ }
+
private static final Logger log = LoggerFactory.getLogger(RestForwarder.class);
private final HttpSession httpSession;
private final String pluginRelativePath;
private final Configuration cfg;
+ private final Provider<Set<PeerInfo>> peerInfoProvider;
@Inject
- RestForwarder(HttpSession httpClient, @PluginName String pluginName, Configuration cfg) {
+ RestForwarder(
+ HttpSession httpClient,
+ @PluginName String pluginName,
+ Configuration cfg,
+ Provider<Set<PeerInfo>> peerInfoProvider) {
this.httpSession = httpClient;
- this.pluginRelativePath = Joiner.on("/").join("/plugins", pluginName);
+ this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName);
this.cfg = cfg;
+ this.peerInfoProvider = peerInfoProvider;
}
@Override
public boolean indexAccount(final int accountId, IndexEvent event) {
- return new Request("index account", accountId) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.post(
- Joiner.on("/").join(pluginRelativePath, "index/account", accountId), event);
- }
- }.execute();
+ return execute(RequestMethod.POST, "index account", "index/account", accountId, event);
}
@Override
public boolean indexChange(String projectName, int changeId, IndexEvent event) {
- return new Request("index change", changeId) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.post(buildIndexEndpoint(projectName, changeId), event);
- }
- }.execute();
+ return execute(
+ RequestMethod.POST,
+ "index change",
+ "index/change",
+ buildIndexEndpoint(projectName, changeId),
+ event);
}
@Override
public boolean deleteChangeFromIndex(final int changeId, IndexEvent event) {
- return new Request("delete change", changeId) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.delete(buildIndexEndpoint(changeId), event);
- }
- }.execute();
+ return execute(
+ RequestMethod.DELETE, "delete change", "index/change", buildIndexEndpoint(changeId), event);
}
@Override
public boolean indexGroup(final String uuid, IndexEvent event) {
- return new Request("index group", uuid) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.post(
- Joiner.on("/").join(pluginRelativePath, "index/group", uuid), event);
- }
- }.execute();
+ return execute(RequestMethod.POST, "index group", "index/group", uuid, event);
}
private String buildIndexEndpoint(int changeId) {
@@ -93,91 +94,131 @@
private String buildIndexEndpoint(String projectName, int changeId) {
String escapedProjectName = Url.encode(projectName);
- return Joiner.on("/")
- .join(pluginRelativePath, "index/change", escapedProjectName + '~' + changeId);
+ return escapedProjectName + '~' + changeId;
}
@Override
public boolean send(final Event event) {
- return new Request("send event", event.type) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.post(Joiner.on("/").join(pluginRelativePath, "event"), event);
- }
- }.execute();
+ return execute(RequestMethod.POST, "send event", "event", event.type, event);
}
@Override
public boolean evict(final String cacheName, final Object key) {
- return new Request("invalidate cache " + cacheName, key) {
- @Override
- HttpResult send() throws IOException {
- String json = GsonParser.toJson(cacheName, key);
- return httpSession.post(Joiner.on("/").join(pluginRelativePath, "cache", cacheName), json);
- }
- }.execute();
+ String json = GsonParser.toJson(cacheName, key);
+ return execute(RequestMethod.POST, "invalidate cache " + cacheName, "cache", cacheName, json);
}
@Override
public boolean addToProjectList(String projectName) {
- return new Request("Update project_list, add ", projectName) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.post(buildProjectListEndpoint(projectName));
- }
- }.execute();
+ return execute(
+ RequestMethod.POST,
+ "Update project_list, add ",
+ buildProjectListEndpoint(),
+ Url.encode(projectName));
}
@Override
public boolean removeFromProjectList(String projectName) {
- return new Request("Update project_list, remove ", projectName) {
- @Override
- HttpResult send() throws IOException {
- return httpSession.delete(buildProjectListEndpoint(projectName));
- }
- }.execute();
+ return execute(
+ RequestMethod.DELETE,
+ "Update project_list, remove ",
+ buildProjectListEndpoint(),
+ Url.encode(projectName));
}
- private String buildProjectListEndpoint(String projectName) {
- return Joiner.on("/")
- .join(pluginRelativePath, "cache", Constants.PROJECT_LIST, Url.encode(projectName));
+ private static String buildProjectListEndpoint() {
+ return Joiner.on("/").join("cache", Constants.PROJECT_LIST);
+ }
+
+ private boolean execute(RequestMethod method, String action, String endpoint, Object id) {
+ return execute(method, action, endpoint, id, null);
+ }
+
+ private boolean execute(
+ RequestMethod method, String action, String endpoint, Object id, Object payload) {
+ List<CompletableFuture<Boolean>> futures =
+ peerInfoProvider
+ .get()
+ .stream()
+ .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
+ .map(request -> CompletableFuture.supplyAsync(request::execute))
+ .collect(Collectors.toList());
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ return futures.stream().allMatch(CompletableFuture::join);
+ }
+
+ private Request createRequest(
+ RequestMethod method,
+ PeerInfo peer,
+ String action,
+ String endpoint,
+ Object id,
+ Object payload) {
+ String destination = peer.getDirectUrl();
+ return new Request(action, id, destination) {
+ @Override
+ HttpResult send() throws IOException {
+ String request = Joiner.on("/").join(destination, pluginRelativePath, endpoint, id);
+ switch (method) {
+ case POST:
+ return httpSession.post(request, payload);
+ case DELETE:
+ default:
+ return httpSession.delete(request);
+ }
+ }
+ };
}
private abstract class Request {
private final String action;
private final Object key;
+ private final String destination;
+
private int execCnt;
- Request(String action, Object key) {
+ Request(String action, Object key, String destination) {
this.action = action;
this.key = key;
+ this.destination = destination;
}
boolean execute() {
- log.debug("Executing {} {}", action, key);
+ log.debug("Executing {} {} towards {}", action, key, destination);
for (; ; ) {
try {
execCnt++;
tryOnce();
- log.debug("{} {} OK", action, key);
+ log.debug("{} {} towards {} OK", action, key, destination);
return true;
} catch (ForwardingException e) {
int maxTries = cfg.http().maxTries();
- log.debug("Failed to {} {} [{}/{}]", action, key, execCnt, maxTries, e);
+ log.debug(
+ "Failed to {} {} on {} [{}/{}]", action, key, destination, execCnt, maxTries, e);
if (!e.isRecoverable()) {
- log.error("{} {} failed with unrecoverable error; giving up", action, key, e);
+ log.error(
+ "{} {} towards {} failed with unrecoverable error; giving up",
+ action,
+ key,
+ destination,
+ e);
return false;
}
if (execCnt >= maxTries) {
- log.error("Failed to {} {} after {} tries; giving up", action, key, maxTries);
+ log.error(
+ "Failed to {} {} on {} after {} tries; giving up",
+ action,
+ key,
+ destination,
+ maxTries);
return false;
}
- log.debug("Retrying to {} {}", action, key);
+ log.debug("Retrying to {} {} on {}", action, key, destination);
try {
Thread.sleep(cfg.http().retryInterval());
} catch (InterruptedException ie) {
- log.error("{} {} was interrupted; giving up", action, key, ie);
+ log.error("{} {} towards {} was interrupted; giving up", action, key, destination, ie);
Thread.currentThread().interrupt();
return false;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java
new file mode 100644
index 0000000..ce04589
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gwtorm.server.OrmException;
+import java.io.IOException;
+import java.util.Optional;
+
+/** Encapsulates the logic of verifying the up-to-date status of a change. */
+public interface ChangeChecker {
+
+ /**
+ * Return the Change notes read from ReviewDb or NoteDb.
+ *
+ * @return notes of the Change
+ * @throws OrmException if ReviewDb or NoteDb cannot be opened
+ */
+ Optional<ChangeNotes> getChangeNotes() throws OrmException;
+
+ /**
+ * Create a new index event POJO associated with the current Change.
+ *
+ * @return new IndexEvent
+ * @throws IOException if the current Change cannot read
+ * @throws OrmException if ReviewDb cannot be opened
+ */
+ Optional<IndexEvent> newIndexEvent() throws IOException, OrmException;
+
+ /**
+ * Check if the local Change is aligned with the indexEvent received.
+ *
+ * @param indexEvent indexing event
+ * @return true if the local Change is up-to-date, false otherwise.
+ * @throws IOException if an I/O error occurred while reading the local Change
+ * @throws OrmException if the local ReviewDb cannot be opened
+ */
+ boolean isChangeUpToDate(Optional<IndexEvent> indexEvent) throws IOException, OrmException;
+
+ /**
+ * Return the last computed up-to-date Change time-stamp.
+ *
+ * <p>Compute the up-to-date Change time-stamp when it is invoked for the very first time.
+ *
+ * @return the Change timestamp epoch in seconds
+ * @throws IOException if an I/O error occurred while reading the local Change
+ * @throws OrmException if the local ReviewDb cannot be opened
+ */
+ Optional<Long> getComputedChangeTs() throws IOException, OrmException;
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
new file mode 100644
index 0000000..75f3086
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
@@ -0,0 +1,170 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Comment;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.CommentsUtil;
+import com.google.gerrit.server.change.ChangeFinder;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gwtorm.server.OrmException;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChangeCheckerImpl implements ChangeChecker {
+ private static final Logger log = LoggerFactory.getLogger(ChangeCheckerImpl.class);
+ private final GitRepositoryManager gitRepoMgr;
+ private final CommentsUtil commentsUtil;
+ private final ChangeDb changeDb;
+ private final OneOffRequestContext oneOffReqCtx;
+ private final String changeId;
+ private final ChangeFinder changeFinder;
+ private Optional<Long> computedChangeTs = Optional.empty();
+ private Optional<ChangeNotes> changeNotes = Optional.empty();
+
+ public interface Factory {
+ ChangeChecker create(String changeId);
+ }
+
+ @Inject
+ public ChangeCheckerImpl(
+ GitRepositoryManager gitRepoMgr,
+ CommentsUtil commentsUtil,
+ ChangeDb changeDb,
+ ChangeFinder changeFinder,
+ OneOffRequestContext oneOffReqCtx,
+ @Assisted String changeId) {
+ this.changeFinder = changeFinder;
+ this.gitRepoMgr = gitRepoMgr;
+ this.commentsUtil = commentsUtil;
+ this.changeDb = changeDb;
+ this.oneOffReqCtx = oneOffReqCtx;
+ this.changeId = changeId;
+ }
+
+ @Override
+ public Optional<IndexEvent> newIndexEvent() throws IOException, OrmException {
+ return getComputedChangeTs()
+ .map(
+ ts -> {
+ IndexEvent event = new IndexEvent();
+ event.eventCreatedOn = ts;
+ event.targetSha = getBranchTargetSha();
+ return event;
+ });
+ }
+
+ @Override
+ public Optional<ChangeNotes> getChangeNotes() throws OrmException {
+ try (ManualRequestContext ctx = oneOffReqCtx.open()) {
+ changeNotes = Optional.ofNullable(changeFinder.findOne(changeId));
+ return changeNotes;
+ }
+ }
+
+ @Override
+ public boolean isChangeUpToDate(Optional<IndexEvent> indexEvent)
+ throws IOException, OrmException {
+ getComputedChangeTs();
+ log.debug("Checking change {} against index event {}", this, indexEvent);
+ if (!computedChangeTs.isPresent()) {
+ log.warn("Unable to compute last updated ts for change {}", changeId);
+ return false;
+ }
+
+ if (indexEvent.isPresent() && indexEvent.get().targetSha == null) {
+ return indexEvent.map(e -> (computedChangeTs.get() >= e.eventCreatedOn)).orElse(true);
+ }
+
+ return indexEvent
+ .map(
+ e ->
+ (computedChangeTs.get() > e.eventCreatedOn)
+ || (computedChangeTs.get() == e.eventCreatedOn)
+ && (Objects.equals(getBranchTargetSha(), e.targetSha)))
+ .orElse(true);
+ }
+
+ @Override
+ public Optional<Long> getComputedChangeTs() throws IOException, OrmException {
+ if (!computedChangeTs.isPresent()) {
+ computedChangeTs = computeLastChangeTs();
+ }
+ return computedChangeTs;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return "change-id="
+ + changeId
+ + "@"
+ + getComputedChangeTs().map(IndexEvent::format)
+ + "/"
+ + getBranchTargetSha();
+ } catch (IOException | OrmException e) {
+ log.error("Unable to render change {}", changeId, e);
+ return "change-id=" + changeId;
+ }
+ }
+
+ private String getBranchTargetSha() {
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) {
+ String refName = changeNotes.get().getChange().getDest().get();
+ Ref ref = repo.exactRef(refName);
+ if (ref == null) {
+ log.warn("Unable to find target ref {} for change {}", refName, changeId);
+ return null;
+ }
+ return ref.getTarget().getObjectId().getName();
+ } catch (IOException e) {
+ log.warn("Unable to resolve target branch SHA for change {}", changeId, e);
+ return null;
+ }
+ }
+
+ private Optional<Long> computeLastChangeTs() throws OrmException {
+ try (ReviewDb db = changeDb.open()) {
+ return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(db, notes));
+ }
+ }
+
+ private long getTsFromChangeAndDraftComments(ReviewDb db, ChangeNotes notes) {
+ Change change = notes.getChange();
+ Timestamp changeTs = change.getLastUpdatedOn();
+ try {
+ for (Comment comment : commentsUtil.draftByChange(db, changeNotes.get())) {
+ Timestamp commentTs = comment.writtenOn;
+ changeTs = commentTs.after(changeTs) ? commentTs : changeTs;
+ }
+ } catch (OrmException e) {
+ log.warn("Unable to access draft comments for change {}", change, e);
+ }
+ return changeTs.getTime() / 1000;
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeDb.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeDb.java
new file mode 100644
index 0000000..bef5363
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeDb.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.notedb.NotesMigration;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+
+public class ChangeDb {
+ private static final DisabledReviewDb disabledReviewDb = new DisabledReviewDb();
+
+ private final NotesMigration migration;
+ private final SchemaFactory<ReviewDb> schemaFactory;
+
+ @Inject
+ public ChangeDb(NotesMigration migration, SchemaFactory<ReviewDb> schemaFactory) {
+ this.migration = migration;
+ this.schemaFactory = schemaFactory;
+ }
+
+ public ReviewDb open() throws OrmException {
+ return migration.readChanges() ? disabledReviewDb : schemaFactory.open();
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/DisabledReviewDb.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/DisabledReviewDb.java
new file mode 100644
index 0000000..25ec6e8
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/DisabledReviewDb.java
@@ -0,0 +1,202 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Change.Id;
+import com.google.gerrit.reviewdb.server.ChangeAccess;
+import com.google.gerrit.reviewdb.server.ChangeMessageAccess;
+import com.google.gerrit.reviewdb.server.PatchLineCommentAccess;
+import com.google.gerrit.reviewdb.server.PatchSetAccess;
+import com.google.gerrit.reviewdb.server.PatchSetApprovalAccess;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.reviewdb.server.SchemaVersionAccess;
+import com.google.gwtorm.server.Access;
+import com.google.gwtorm.server.AtomicUpdate;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.ResultSet;
+import com.google.gwtorm.server.StatementExecutor;
+import java.util.Map;
+
+/** ReviewDb that is disabled. */
+@SuppressWarnings("deprecation")
+public class DisabledReviewDb implements ReviewDb {
+ public static class Disabled extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ private Disabled() {
+ super("ReviewDb is disabled for changes");
+ }
+ }
+
+ public static class DisabledChangeAccess implements ChangeAccess {
+
+ @Override
+ public String getRelationName() {
+ throw new Disabled();
+ }
+
+ @Override
+ public int getRelationID() {
+ throw new Disabled();
+ }
+
+ @Override
+ public ResultSet<Change> iterateAllEntities() throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public Id primaryKey(Change entity) {
+ throw new Disabled();
+ }
+
+ @Override
+ public Map<Id, Change> toMap(Iterable<Change> c) {
+ throw new Disabled();
+ }
+
+ @Override
+ public CheckedFuture<Change, OrmException> getAsync(Id key) {
+ throw new Disabled();
+ }
+
+ @Override
+ public ResultSet<Change> get(Iterable<Id> keys) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void insert(Iterable<Change> instances) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void update(Iterable<Change> instances) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void upsert(Iterable<Change> instances) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void deleteKeys(Iterable<Id> keys) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void delete(Iterable<Change> instances) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public void beginTransaction(Id key) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public Change atomicUpdate(Id key, AtomicUpdate<Change> update) throws OrmException {
+ throw new Disabled();
+ }
+
+ @Override
+ public Change get(Id id) throws OrmException {
+ return null;
+ }
+
+ @Override
+ public ResultSet<Change> all() throws OrmException {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+
+ @Override
+ public void commit() {
+ throw new Disabled();
+ }
+
+ @Override
+ public void rollback() {
+ throw new Disabled();
+ }
+
+ @Override
+ public void updateSchema(StatementExecutor e) {
+ throw new Disabled();
+ }
+
+ @Override
+ public void pruneSchema(StatementExecutor e) {
+ throw new Disabled();
+ }
+
+ @Override
+ public Access<?, ?>[] allRelations() {
+ throw new Disabled();
+ }
+
+ @Override
+ public SchemaVersionAccess schemaVersion() {
+ throw new Disabled();
+ }
+
+ @Override
+ public ChangeAccess changes() {
+ return new DisabledChangeAccess();
+ }
+
+ @Override
+ public PatchSetApprovalAccess patchSetApprovals() {
+ throw new Disabled();
+ }
+
+ @Override
+ public ChangeMessageAccess changeMessages() {
+ throw new Disabled();
+ }
+
+ @Override
+ public PatchSetAccess patchSets() {
+ throw new Disabled();
+ }
+
+ @Override
+ public PatchLineCommentAccess patchComments() {
+ throw new Disabled();
+ }
+
+ @Override
+ public int nextAccountId() {
+ throw new Disabled();
+ }
+
+ @Override
+ public int nextAccountGroupId() {
+ throw new Disabled();
+ }
+
+ @Override
+ public int nextChangeId() {
+ throw new Disabled();
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/PeerInfoNotAvailableException.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java
similarity index 61%
rename from src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/PeerInfoNotAvailableException.java
rename to src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java
index df94f52..44c84dc 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/PeerInfoNotAvailableException.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java
@@ -1,4 +1,4 @@
-// Copyright (C) 2017 The Android Open Source Project
+// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+package com.ericsson.gerrit.plugins.highavailability.index;
-import java.io.IOException;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
-public class PeerInfoNotAvailableException extends IOException {
- private static final long serialVersionUID = 1L;
-}
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ForwardedIndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
new file mode 100644
index 0000000..2112dbe
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class ForwardedIndexExecutorProvider extends ExecutorProvider {
+
+ @Inject
+ ForwardedIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
+ super(workQueue, config.index().threadPoolSize(), "Forwarded-Index-Event");
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 106422d..3010689 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -27,20 +27,28 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class IndexEventHandler
implements ChangeIndexedListener, AccountIndexedListener, GroupIndexedListener {
+ private static final Logger log = LoggerFactory.getLogger(IndexEventHandler.class);
private final Executor executor;
private final Forwarder forwarder;
private final String pluginName;
private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final ChangeCheckerImpl.Factory changeChecker;
@Inject
IndexEventHandler(
- @IndexExecutor Executor executor, @PluginName String pluginName, Forwarder forwarder) {
+ @IndexExecutor Executor executor,
+ @PluginName String pluginName,
+ Forwarder forwarder,
+ ChangeCheckerImpl.Factory changeChecker) {
this.forwarder = forwarder;
this.executor = executor;
this.pluginName = pluginName;
+ this.changeChecker = changeChecker;
}
@Override
@@ -75,9 +83,19 @@
private void executeIndexChangeTask(String projectName, int id, boolean deleted) {
if (!Context.isForwardedEvent()) {
- IndexChangeTask task = new IndexChangeTask(projectName, id, deleted);
- if (queuedTasks.add(task)) {
- executor.execute(task);
+ ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+ try {
+ checker
+ .newIndexEvent()
+ .map(event -> new IndexChangeTask(projectName, id, deleted, event))
+ .ifPresent(
+ task -> {
+ if (queuedTasks.add(task)) {
+ executor.execute(task);
+ }
+ });
+ } catch (Exception e) {
+ log.warn("Unable to create task to handle change {}~{}", projectName, id, e);
}
}
}
@@ -89,6 +107,10 @@
indexEvent = new IndexEvent();
}
+ IndexTask(IndexEvent indexEvent) {
+ this.indexEvent = indexEvent;
+ }
+
@Override
public void run() {
queuedTasks.remove(this);
@@ -103,7 +125,8 @@
private final int changeId;
private final String projectName;
- IndexChangeTask(String projectName, int changeId, boolean deleted) {
+ IndexChangeTask(String projectName, int changeId, boolean deleted, IndexEvent indexEvent) {
+ super(indexEvent);
this.projectName = projectName;
this.changeId = changeId;
this.deleted = deleted;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index f88a806..ebf8fdf 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -19,16 +19,26 @@
import com.google.gerrit.extensions.events.GroupIndexedListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
public class IndexModule extends LifecycleModule {
@Override
protected void configure() {
bind(Executor.class).annotatedWith(IndexExecutor.class).toProvider(IndexExecutorProvider.class);
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(ForwardedIndexExecutor.class)
+ .toProvider(ForwardedIndexExecutorProvider.class);
listener().to(IndexExecutorProvider.class);
DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
+
+ install(
+ new FactoryModuleBuilder()
+ .implement(ChangeChecker.class, ChangeCheckerImpl.class)
+ .build(ChangeCheckerImpl.Factory.class));
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
index 577d66d..68707c2 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PeerInfoModule.java
@@ -18,7 +18,7 @@
import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.JGroupsPeerInfoProvider;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.TypeLiteral;
-import java.util.Optional;
+import java.util.Set;
public class PeerInfoModule extends LifecycleModule {
@@ -32,11 +32,10 @@
protected void configure() {
switch (strategy) {
case STATIC:
- bind(new TypeLiteral<Optional<PeerInfo>>() {})
- .toProvider(PluginConfigPeerInfoProvider.class);
+ bind(new TypeLiteral<Set<PeerInfo>>() {}).toProvider(PluginConfigPeerInfoProvider.class);
break;
case JGROUPS:
- bind(new TypeLiteral<Optional<PeerInfo>>() {}).toProvider(JGroupsPeerInfoProvider.class);
+ bind(new TypeLiteral<Set<PeerInfo>>() {}).toProvider(JGroupsPeerInfoProvider.class);
listener().to(JGroupsPeerInfoProvider.class);
break;
default:
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PluginConfigPeerInfoProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PluginConfigPeerInfoProvider.java
index f39eb6b..9233c79 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PluginConfigPeerInfoProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/PluginConfigPeerInfoProvider.java
@@ -18,20 +18,21 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
-import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
@Singleton
-public class PluginConfigPeerInfoProvider implements Provider<Optional<PeerInfo>> {
+public class PluginConfigPeerInfoProvider implements Provider<Set<PeerInfo>> {
- private final Optional<PeerInfo> peerInfo;
+ private final Set<PeerInfo> peers;
@Inject
PluginConfigPeerInfoProvider(Configuration cfg) {
- peerInfo = Optional.of(new PeerInfo(cfg.peerInfoStatic().url()));
+ peers = cfg.peerInfoStatic().urls().stream().map(PeerInfo::new).collect(Collectors.toSet());
}
@Override
- public Optional<PeerInfo> get() {
- return peerInfo;
+ public Set<PeerInfo> get() {
+ return peers;
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
index cf46ac3..596f86e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
@@ -16,6 +16,7 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
import com.google.inject.Provider;
@@ -23,6 +24,7 @@
import java.net.InetAddress;
import java.nio.file.Path;
import java.util.Optional;
+import java.util.Set;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
@@ -43,7 +45,7 @@
*/
@Singleton
public class JGroupsPeerInfoProvider extends ReceiverAdapter
- implements Provider<Optional<PeerInfo>>, LifecycleListener {
+ implements Provider<Set<PeerInfo>>, LifecycleListener {
private static final Logger log = LoggerFactory.getLogger(JGroupsPeerInfoProvider.class);
private static final String JGROUPS_LOG_FACTORY_PROPERTY = "jgroups.logging.log_factory_class";
@@ -85,7 +87,6 @@
@Override
public void viewAccepted(View view) {
log.info("viewAccepted(view: {}) called", view);
-
synchronized (this) {
if (view.getMembers().size() > 2) {
log.warn(
@@ -164,8 +165,8 @@
}
@Override
- public Optional<PeerInfo> get() {
- return peerInfo;
+ public Set<PeerInfo> get() {
+ return peerInfo.isPresent() ? ImmutableSet.of(peerInfo.get()) : ImmutableSet.of();
}
@Override
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 331dae5..b2040d5 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -1,3 +1,4 @@
+
This plugin allows making Gerrit highly available by having redundant Gerrit
masters.
@@ -7,10 +8,10 @@
* sharing the git repositories using a shared file system (e.g. NFS)
* behind a load balancer (e.g. HAProxy)
-Currently, the only mode supported is one primary (active) master and one backup
-(passive) master but eventually the plan is to support `n` active masters. In
+Currently, the mode supported is one primary (active) master and multiple backup
+(passive) masters but eventually the plan is to support `n` active masters. In
the active/passive mode, the active master is handling all traffic while the
-passive is kept updated to be always ready to take over.
+passives are kept updated to be always ready to take over.
Even if database and git repositories are shared by the masters, there are a few
areas of concern in order to be able to switch traffic between masters in a
@@ -23,38 +24,38 @@
* web sessions
They need either to be shared or kept local to each master but synchronized.
-This plugin needs to be installed in both masters and will take care of sharing
+This plugin needs to be installed in all the masters and it will take care of sharing
or synchronizing them.
#### Caches
Every time a cache eviction occurs in one of the masters, the eviction will be
-forwarded the other master so its caches do not contain stale entries.
+forwarded the other masters so their caches do not contain stale entries.
#### Secondary indexes
Every time the secondary index is modified in one of the masters, e.g., a change
-is added, updated or removed from the index, the other master's index is
+is added, updated or removed from the index, the others master's index are
updated accordingly. This way, both indexes are kept synchronized.
#### Stream events
Every time a stream event occurs in one of the masters (see [more events info]
(https://gerrit-review.googlesource.com/Documentation/cmd-stream-events.html#events)),
-the event is forwarded to the other master which re-plays it. This way, the
+the event is forwarded to the other masters which re-plays it. This way, the
output of the stream-events command is the same, no matter which master a client
is connected to.
#### Web session
The built-in Gerrit H2 based web session cache is replaced with a file based
-implementation that is shared amongst both masters.
+implementation that is shared amongst the masters.
## Setup
Prerequisites:
-* Unique database server must be accessible from both masters
+* Unique database server must be accessible from all the masters
* Git repositories must be located on a shared file system
* A directory on a shared file system must be available for @PLUGIN@ to use
-For both masters:
+For the masters:
* Configure database section in gerrit.config to use the shared database
* Configure gerrit.basePath in gerrit.config to the shared repositories location
@@ -69,7 +70,7 @@
sharedDirectory = /directory/accessible/from/both/masters
[peerInfo "static"]
- url = http://backupMasterHost:8081/
+ url = http://backupMasterHost1:8081/
[http]
user = username
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index b59b1fc..04ccafc 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -1,7 +1,8 @@
+
@PLUGIN@ Configuration
=========================
-The @PLUGIN@ plugin must be installed on both instances and the following fields
+The @PLUGIN@ plugin must be installed on all the instances and the following fields
should be specified in `$site_path/etc/@PLUGIN@.config` file:
File '@PLUGIN@.config'
@@ -17,7 +18,8 @@
[peerInfo]
strategy = static
[peerInfo "static"]
- url = target_instance_url
+ url = first_target_instance_url
+ url = second_target_instance_url
[http]
user = username
password = password
@@ -93,7 +95,8 @@
a member joins or leaves the cluster.
```peerInfo.static.url```
-: Specify the URL for the peer instance.
+: Specify the URL for the peer instance. If more than one peer instance is to be
+ configured, add as many url entries as necessary.
```peerInfo.jgroups.myUrl```
: The URL of this instance to be broadcast to other peers. If not specified, the
@@ -192,6 +195,17 @@
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
+```index.maxTries```
+: Maximum number of times the plugin should attempt to reindex changes.
+ Setting this value to 0 will disable retries. After this number of failed tries,
+ an error is logged and the local index should be considered stale and needs
+ to be investigated and manually reindexed.
+ Defaults to 2.
+
+```index.retryInterval```
+: The interval of time in milliseconds between the subsequent auto-retries.
+ Defaults to 30000 (30 seconds).
+
```websession.synchronize```
: Whether to synchronize web sessions.
Defaults to true.
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index efb3b75..63ab408 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -69,6 +69,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.List;
import org.eclipse.jgit.lib.Config;
import org.junit.Before;
import org.junit.Test;
@@ -84,6 +85,7 @@
private static final String PASS = "fakePass";
private static final String USER = "fakeUser";
private static final String URL = "http://fakeUrl";
+ private static final List<String> URLS = ImmutableList.of(URL, "http://anotherUrl/");
private static final int TIMEOUT = 5000;
private static final int MAX_TRIES = 5;
private static final int RETRY_INTERVAL = 1000;
@@ -122,17 +124,12 @@
}
@Test
- public void testGetUrl() throws Exception {
- assertThat(getConfiguration().peerInfoStatic().url()).isEmpty();
+ public void testGetUrls() throws Exception {
+ assertThat(getConfiguration().peerInfoStatic().urls()).isEmpty();
- globalPluginConfig.setString(PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY, URL);
- assertThat(getConfiguration().peerInfoStatic().url()).isEqualTo(URL);
- }
-
- @Test
- public void testGetUrlIsDroppingTrailingSlash() throws Exception {
- globalPluginConfig.setString(PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY, URL + "/");
- assertThat(getConfiguration().peerInfoStatic().url()).isEqualTo(URL);
+ globalPluginConfig.setStringList(PEER_INFO_SECTION, STATIC_SUBSECTION, URL_KEY, URLS);
+ assertThat(getConfiguration().peerInfoStatic().urls())
+ .containsAllIn(ImmutableList.of(URL, "http://anotherUrl"));
}
@Test
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
index 54cd266..6bfc58b 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -25,17 +25,19 @@
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeDb;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.index.change.ChangeIndexer;
import com.google.gerrit.server.notedb.ChangeNotes;
-import com.google.gerrit.server.project.NoSuchChangeException;
+import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.gwtorm.server.OrmException;
-import com.google.gwtorm.server.SchemaFactory;
import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -57,40 +59,58 @@
private static final boolean DO_NOT_THROW_ORM_EXCEPTION = false;
private static final boolean THROW_IO_EXCEPTION = true;
private static final boolean THROW_ORM_EXCEPTION = true;
+ private static final boolean CHANGE_UP_TO_DATE = true;
+ private static final boolean CHANGE_OUTDATED = false;
@Rule public ExpectedException exception = ExpectedException.none();
@Mock private ChangeIndexer indexerMock;
- @Mock private SchemaFactory<ReviewDb> schemaFactoryMock;
+ @Mock private ChangeDb changeDbMock;
@Mock private ReviewDb dbMock;
- @Mock private ChangeFinder changeFinderMock;
@Mock private ChangeNotes changeNotes;
@Mock private Configuration configMock;
@Mock private Configuration.Index indexMock;
+ @Mock private ScheduledExecutorService indexExecutorMock;
+ @Mock private OneOffRequestContext ctxMock;
+ @Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
+ @Mock private ChangeChecker changeCheckerAbsentMock;
+ @Mock private ChangeChecker changeCheckerPresentMock;
private ForwardedIndexChangeHandler handler;
private Change.Id id;
- private Change change;
@Before
public void setUp() throws Exception {
- when(schemaFactoryMock.open()).thenReturn(dbMock);
+ when(changeDbMock.open()).thenReturn(dbMock);
id = new Change.Id(TEST_CHANGE_NUMBER);
- change = new Change(null, id, null, null, TimeUtil.nowTs());
+ Change change = new Change(null, id, null, null, TimeUtil.nowTs());
when(changeNotes.getChange()).thenReturn(change);
when(configMock.index()).thenReturn(indexMock);
when(indexMock.numStripedLocks()).thenReturn(10);
+ when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
handler =
new ForwardedIndexChangeHandler(
- indexerMock, schemaFactoryMock, changeFinderMock, configMock);
+ indexerMock,
+ changeDbMock,
+ configMock,
+ indexExecutorMock,
+ ctxMock,
+ changeCheckerFactoryMock);
}
@Test
- public void changeIsIndexed() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+ public void changeIsIndexedWhenUpToDate() throws Exception {
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
}
@Test
+ public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
+ handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.of(new IndexEvent()));
+ verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
+ }
+
+ @Test
public void changeIsDeletedFromIndex() throws Exception {
handler.index(TEST_CHANGE_ID, Operation.DELETE, Optional.empty());
verify(indexerMock, times(1)).delete(id);
@@ -98,43 +118,29 @@
@Test
public void changeToIndexDoesNotExist() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_DOES_NOT_EXIST);
+ setupChangeAccessRelatedMocks(CHANGE_DOES_NOT_EXIST, CHANGE_OUTDATED);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
verify(indexerMock, times(1)).delete(id);
}
@Test
public void schemaThrowsExceptionWhenLookingUpForChange() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_ORM_EXCEPTION);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_ORM_EXCEPTION, CHANGE_UP_TO_DATE);
exception.expect(OrmException.class);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
}
@Test
- public void indexerThrowsNoSuchChangeExceptionTryingToPostChange() throws Exception {
- doThrow(new NoSuchChangeException(id)).when(schemaFactoryMock).open();
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
- verify(indexerMock, times(1)).delete(id);
- }
-
- @Test
- public void indexerThrowsNestedNoSuchChangeExceptionTryingToPostChange() throws Exception {
- OrmException e = new OrmException("test", new NoSuchChangeException(id));
- doThrow(e).when(schemaFactoryMock).open();
- handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
- verify(indexerMock, times(1)).delete(id);
- }
-
- @Test
public void indexerThrowsIOExceptionTryingToIndexChange() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS, DO_NOT_THROW_ORM_EXCEPTION, THROW_IO_EXCEPTION);
+ setupChangeAccessRelatedMocks(
+ CHANGE_EXISTS, DO_NOT_THROW_ORM_EXCEPTION, THROW_IO_EXCEPTION, CHANGE_UP_TO_DATE);
exception.expect(IOException.class);
handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
}
@Test
public void shouldSetAndUnsetForwardedContext() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
// this doAnswer is to allow to assert that context is set to forwarded
// while cache eviction is called.
doAnswer(
@@ -155,7 +161,7 @@
@Test
public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
- setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+ setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
doAnswer(
(Answer<Void>)
invocation -> {
@@ -177,33 +183,38 @@
verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
}
- private void setupChangeAccessRelatedMocks(boolean changeExist) throws Exception {
+ private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
+ throws Exception {
setupChangeAccessRelatedMocks(
- changeExist, DO_NOT_THROW_ORM_EXCEPTION, DO_NOT_THROW_IO_EXCEPTION);
- }
-
- private void setupChangeAccessRelatedMocks(boolean changeExist, boolean ormException)
- throws OrmException, IOException {
- setupChangeAccessRelatedMocks(changeExist, ormException, DO_NOT_THROW_IO_EXCEPTION);
+ changeExist, DO_NOT_THROW_ORM_EXCEPTION, DO_NOT_THROW_IO_EXCEPTION, changeUpToDate);
}
private void setupChangeAccessRelatedMocks(
- boolean changeExists, boolean ormException, boolean ioException)
+ boolean changeExist, boolean ormException, boolean changeUpToDate)
+ throws OrmException, IOException {
+ setupChangeAccessRelatedMocks(
+ changeExist, ormException, DO_NOT_THROW_IO_EXCEPTION, changeUpToDate);
+ }
+
+ private void setupChangeAccessRelatedMocks(
+ boolean changeExists, boolean ormException, boolean ioException, boolean changeIsUpToDate)
throws OrmException, IOException {
if (ormException) {
- doThrow(new OrmException("")).when(schemaFactoryMock).open();
+ doThrow(new OrmException("")).when(changeDbMock).open();
} else {
- when(schemaFactoryMock.open()).thenReturn(dbMock);
- if (changeExists) {
- when(changeFinderMock.findOne(TEST_CHANGE_ID)).thenReturn(changeNotes);
- if (ioException) {
- doThrow(new IOException("io-error"))
- .when(indexerMock)
- .index(any(ReviewDb.class), any(Change.class));
- }
- } else {
- when(changeFinderMock.findOne(TEST_CHANGE_ID)).thenReturn(null);
+ when(changeDbMock.open()).thenReturn(dbMock);
+ }
+
+ if (changeExists) {
+ when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
+ when(changeCheckerPresentMock.getChangeNotes()).thenReturn(Optional.of(changeNotes));
+ if (ioException) {
+ doThrow(new IOException("io-error"))
+ .when(indexerMock)
+ .index(any(ReviewDb.class), any(Change.class));
}
}
+
+ when(changeCheckerPresentMock.isChangeUpToDate(any())).thenReturn(changeIsUpToDate);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
index b6eba3d..5e0d4c9 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
@@ -15,35 +15,27 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
-import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.delete;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
-import static com.github.tomakehurst.wiremock.client.WireMock.exactly;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.google.common.truth.Truth.assertThat;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
-import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.github.tomakehurst.wiremock.http.Fault;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
-import com.google.inject.util.Providers;
-import java.io.IOException;
import java.net.SocketTimeoutException;
-import java.util.Optional;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Answers;
public class HttpSessionTest {
+
private static final int MAX_TRIES = 3;
private static final int RETRY_INTERVAL = 250;
private static final int TIMEOUT = 500;
@@ -64,12 +56,13 @@
@Rule public WireMockRule wireMockRule = new WireMockRule(0);
- private Configuration configMock;
+ private String uri;
@Before
public void setUp() throws Exception {
String url = "http://localhost:" + wireMockRule.port();
- configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
+ uri = url + ENDPOINT;
+ Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
when(configMock.http().user()).thenReturn("user");
when(configMock.http().password()).thenReturn("pass");
when(configMock.http().maxTries()).thenReturn(MAX_TRIES);
@@ -77,11 +70,7 @@
when(configMock.http().socketTimeout()).thenReturn(TIMEOUT);
when(configMock.http().retryInterval()).thenReturn(RETRY_INTERVAL);
- PeerInfo peerInfo = mock(PeerInfo.class);
- when(peerInfo.getDirectUrl()).thenReturn(url);
- httpSession =
- new HttpSession(
- new HttpClientProvider(configMock).get(), Providers.of(Optional.of(peerInfo)));
+ httpSession = new HttpSession(new HttpClientProvider(configMock).get());
}
@Test
@@ -89,7 +78,7 @@
wireMockRule.givenThat(
post(urlEqualTo(ENDPOINT)).willReturn(aResponse().withStatus(NO_CONTENT)));
- assertThat(httpSession.post(ENDPOINT).isSuccessful()).isTrue();
+ assertThat(httpSession.post(uri).isSuccessful()).isTrue();
}
@Test
@@ -98,7 +87,7 @@
post(urlEqualTo(ENDPOINT))
.withRequestBody(equalTo(BODY))
.willReturn(aResponse().withStatus(NO_CONTENT)));
- assertThat(httpSession.post(ENDPOINT, BODY).isSuccessful()).isTrue();
+ assertThat(httpSession.post(uri, BODY).isSuccessful()).isTrue();
}
@Test
@@ -106,7 +95,7 @@
wireMockRule.givenThat(
delete(urlEqualTo(ENDPOINT)).willReturn(aResponse().withStatus(NO_CONTENT)));
- assertThat(httpSession.delete(ENDPOINT).isSuccessful()).isTrue();
+ assertThat(httpSession.delete(uri).isSuccessful()).isTrue();
}
@Test
@@ -116,7 +105,7 @@
post(urlEqualTo(ENDPOINT))
.willReturn(aResponse().withStatus(UNAUTHORIZED).withBody(expected)));
- HttpResult result = httpSession.post(ENDPOINT);
+ HttpResult result = httpSession.post(uri);
assertThat(result.isSuccessful()).isFalse();
assertThat(result.getMessage()).isEqualTo(expected);
}
@@ -128,7 +117,7 @@
post(urlEqualTo(ENDPOINT))
.willReturn(aResponse().withStatus(NOT_FOUND).withBody(expected)));
- HttpResult result = httpSession.post(ENDPOINT);
+ HttpResult result = httpSession.post(uri);
assertThat(result.isSuccessful()).isFalse();
assertThat(result.getMessage()).isEqualTo(expected);
}
@@ -139,7 +128,7 @@
post(urlEqualTo(ENDPOINT))
.willReturn(aResponse().withStatus(ERROR).withBody(ERROR_MESSAGE)));
- HttpResult result = httpSession.post(ENDPOINT);
+ HttpResult result = httpSession.post(uri);
assertThat(result.isSuccessful()).isFalse();
assertThat(result.getMessage()).isEqualTo(ERROR_MESSAGE);
}
@@ -170,7 +159,7 @@
.whenScenarioStateIs(THIRD_TRY)
.willReturn(aResponse().withFixedDelay(TIMEOUT)));
- httpSession.post(ENDPOINT);
+ httpSession.post(uri);
}
@Test
@@ -179,19 +168,6 @@
post(urlEqualTo(ENDPOINT))
.willReturn(aResponse().withFault(Fault.MALFORMED_RESPONSE_CHUNK)));
- assertThat(httpSession.post(ENDPOINT).isSuccessful()).isFalse();
- }
-
- @Test
- public void testNoRequestWhenPeerInfoUnknown() throws IOException {
- httpSession =
- new HttpSession(new HttpClientProvider(configMock).get(), Providers.of(Optional.empty()));
- try {
- httpSession.post(ENDPOINT);
- fail("Expected PeerInfoNotAvailableException");
- } catch (PeerInfoNotAvailableException e) {
- // good
- }
- verify(exactly(0), anyRequestedFor(anyUrl()));
+ assertThat(httpSession.post(uri).isSuccessful()).isFalse();
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index fd2aa52..c5b52a0 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -26,22 +26,27 @@
import com.ericsson.gerrit.plugins.highavailability.cache.Constants;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult;
+import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.AccountGroup;
import com.google.gerrit.server.events.Event;
import com.google.gson.GsonBuilder;
+import com.google.inject.Provider;
import java.io.IOException;
+import java.util.Set;
import javax.net.ssl.SSLException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Answers;
public class RestForwarderTest {
+ private static final String URL = "http://fake.com";
private static final String PLUGIN_NAME = "high-availability";
private static final String EMPTY_MSG = "";
private static final String ERROR = "Error";
- private static final String PLUGINS = "/plugins";
+ private static final String PLUGINS = "plugins";
private static final String PROJECT_TO_ADD = "projectToAdd";
private static final String PROJECT_TO_DELETE = "projectToDelete";
private static final String SUCCESS = "Success";
@@ -54,30 +59,41 @@
private static final String PROJECT_NAME_URL_END = "test%2Fproject";
private static final String INDEX_CHANGE_ENDPOINT =
Joiner.on("/")
- .join(PLUGINS, PLUGIN_NAME, "index/change", PROJECT_NAME_URL_END + "~" + CHANGE_NUMBER);
+ .join(
+ URL,
+ PLUGINS,
+ PLUGIN_NAME,
+ "index/change",
+ PROJECT_NAME_URL_END + "~" + CHANGE_NUMBER);
private static final String DELETE_CHANGE_ENDPOINT =
- Joiner.on("/").join("/plugins", PLUGIN_NAME, "index/change", "~" + CHANGE_NUMBER);
+ Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "index/change", "~" + CHANGE_NUMBER);
private static final int ACCOUNT_NUMBER = 2;
private static final String INDEX_ACCOUNT_ENDPOINT =
- Joiner.on("/").join(PLUGINS, PLUGIN_NAME, "index/account", ACCOUNT_NUMBER);
+ Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "index/account", ACCOUNT_NUMBER);
private static final String UUID = "we235jdf92nfj2351";
private static final String INDEX_GROUP_ENDPOINT =
- Joiner.on("/").join(PLUGINS, PLUGIN_NAME, "index/group", UUID);
+ Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "index/group", UUID);
// Event
- private static final String EVENT_ENDPOINT = Joiner.on("/").join(PLUGINS, PLUGIN_NAME, "event");
private static Event event = new Event("test-event") {};
+ private static final String EVENT_ENDPOINT =
+ Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "event", event.type);
private RestForwarder forwarder;
private HttpSession httpSessionMock;
+ @SuppressWarnings("unchecked")
@Before
public void setUp() {
httpSessionMock = mock(HttpSession.class);
Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
when(configMock.http().maxTries()).thenReturn(3);
when(configMock.http().retryInterval()).thenReturn(10);
- forwarder = new RestForwarder(httpSessionMock, PLUGIN_NAME, configMock);
+ Provider<Set<PeerInfo>> peersMock = mock(Provider.class);
+ when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL)));
+ forwarder =
+ new RestForwarder(
+ httpSessionMock, PLUGIN_NAME, configMock, peersMock); // TODO: Create provider
}
@Test
@@ -142,21 +158,21 @@
@Test
public void testChangeDeletedFromIndexOK() throws Exception {
- when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT), any()))
+ when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isTrue();
}
@Test
public void testChangeDeletedFromIndexFailed() throws Exception {
- when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT), any()))
+ when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
}
@Test
public void testChangeDeletedFromThrowsException() throws Exception {
- doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT), any());
+ doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT));
assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
}
@@ -201,8 +217,8 @@
public void testEvictGroupsOK() throws Exception {
AccountGroup.Id key = new AccountGroup.Id(123);
String keyJson = new GsonBuilder().create().toJson(key);
- when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS), keyJson))
- .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
+ String endpoint = buildCacheEndpoint(Constants.GROUPS);
+ when(httpSessionMock.post(endpoint, keyJson)).thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
assertThat(forwarder.evict(Constants.GROUPS, key)).isTrue();
}
@@ -244,13 +260,13 @@
}
private static String buildCacheEndpoint(String name) {
- return Joiner.on("/").join(PLUGINS, PLUGIN_NAME, "cache", name);
+ return Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "cache", name);
}
@Test
public void testAddToProjectListOK() throws Exception {
String projectName = PROJECT_TO_ADD;
- when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName)))
+ when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
assertThat(forwarder.addToProjectList(projectName)).isTrue();
}
@@ -258,7 +274,7 @@
@Test
public void testAddToProjectListFailed() throws Exception {
String projectName = PROJECT_TO_ADD;
- when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName)))
+ when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
assertThat(forwarder.addToProjectList(projectName)).isFalse();
}
@@ -268,7 +284,7 @@
String projectName = PROJECT_TO_ADD;
doThrow(new IOException())
.when(httpSessionMock)
- .post(buildProjectListCacheEndpoint(projectName));
+ .post(buildProjectListCacheEndpoint(projectName), null);
assertThat(forwarder.addToProjectList(projectName)).isFalse();
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 43c2fd7..3ec979c 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -21,9 +21,11 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
@@ -31,6 +33,7 @@
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.AccountGroup;
import com.google.gerrit.reviewdb.client.Change;
+import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Before;
import org.junit.Test;
@@ -49,17 +52,22 @@
private IndexEventHandler indexEventHandler;
@Mock private Forwarder forwarder;
+ @Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
+ @Mock private ChangeChecker changeCheckerMock;
private Change.Id changeId;
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
@Before
- public void setUpMocks() {
+ public void setUpMocks() throws Exception {
changeId = new Change.Id(CHANGE_ID);
accountId = new Account.Id(ACCOUNT_ID);
accountGroupUUID = new AccountGroup.UUID(UUID);
+ when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
+ when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
indexEventHandler =
- new IndexEventHandler(MoreExecutors.directExecutor(), PLUGIN_NAME, forwarder);
+ new IndexEventHandler(
+ MoreExecutors.directExecutor(), PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
}
@Test
@@ -116,17 +124,19 @@
@Test
public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+ indexEventHandler =
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(poolMock, times(1))
- .execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false));
+ .execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null));
}
@Test
public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+ indexEventHandler =
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
indexEventHandler.onAccountIndexed(accountId.get());
indexEventHandler.onAccountIndexed(accountId.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
@@ -135,7 +145,8 @@
@Test
public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
- indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+ indexEventHandler =
+ new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
@@ -143,7 +154,8 @@
@Test
public void testIndexChangeTaskToString() throws Exception {
- IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+ IndexChangeTask task =
+ indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
assertThat(task.toString())
.isEqualTo(
String.format("[%s] Index change %s in target instance", PLUGIN_NAME, CHANGE_ID));
@@ -166,29 +178,31 @@
@Test
public void testIndexChangeTaskHashCodeAndEquals() {
- IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+ IndexChangeTask task =
+ indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
IndexChangeTask sameTask = task;
assertThat(task.equals(sameTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
IndexChangeTask identicalTask =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+ indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
assertThat(task.equals(identicalTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
assertThat(task.equals(null)).isFalse();
assertThat(
- task.equals(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, false)))
+ task.equals(
+ indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, false, null)))
.isFalse();
assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
IndexChangeTask differentChangeIdTask =
- indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, false);
+ indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, false, null);
assertThat(task.equals(differentChangeIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
- IndexChangeTask removeTask = indexEventHandler.new IndexChangeTask("", CHANGE_ID, true);
+ IndexChangeTask removeTask = indexEventHandler.new IndexChangeTask("", CHANGE_ID, true, null);
assertThat(task.equals(removeTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(removeTask.hashCode());
}