Merge branch 'stable-3.1' into stable-3.2
* stable-3.1:
Stop occupying threads during retry phase
Fix issue with indexing tasks queue
Change-Id: Ifba0ce701eb1b32385d3eb6783712a71ffca9264
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
index 49cdc8b..bb47f11 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
@@ -15,6 +15,7 @@
package com.ericsson.gerrit.plugins.highavailability.forwarder;
import com.google.gerrit.server.events.Event;
+import java.util.concurrent.CompletableFuture;
/** Forward indexing, stream events and cache evictions to the other master */
public interface Forwarder {
@@ -24,9 +25,10 @@
*
* @param accountId the account to index.
* @param indexEvent the details of the index event.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean indexAccount(int accountId, IndexEvent indexEvent);
+ CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent);
/**
* Forward a change indexing event to the other master.
@@ -34,67 +36,75 @@
* @param projectName the project of the change to index.
* @param changeId the change to index.
* @param indexEvent the details of the index event.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean indexChange(String projectName, int changeId, IndexEvent indexEvent);
+ CompletableFuture<Boolean> indexChange(String projectName, int changeId, IndexEvent indexEvent);
/**
* Forward a delete change from index event to the other master.
*
* @param changeId the change to remove from the index.
* @param indexEvent the details of the index event.
- * @return rue if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
+ CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
/**
* Forward a group indexing event to the other master.
*
* @param uuid the group to index.
* @param indexEvent the details of the index event.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean indexGroup(String uuid, IndexEvent indexEvent);
+ CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent);
/**
* Forward a project indexing event to the other master.
*
* @param projectName the project to index.
* @param indexEvent the details of the index event.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean indexProject(String projectName, IndexEvent indexEvent);
+ CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent);
/**
* Forward a stream event to the other master.
*
* @param event the event to forward.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean send(Event event);
+ CompletableFuture<Boolean> send(Event event);
/**
* Forward a cache eviction event to the other master.
*
* @param cacheName the name of the cache to evict an entry from.
* @param key the key identifying the entry to evict from the cache.
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean evict(String cacheName, Object key);
+ CompletableFuture<Boolean> evict(String cacheName, Object key);
/**
* Forward an addition to the project list cache to the other master.
*
* @param projectName the name of the project to add to the project list cache
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean addToProjectList(String projectName);
+ CompletableFuture<Boolean> addToProjectList(String projectName);
/**
* Forward a removal from the project list cache to the other master.
*
* @param projectName the name of the project to remove from the project list cache
- * @return true if successful, otherwise false.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
*/
- boolean removeFromProjectList(String projectName);
+ CompletableFuture<Boolean> removeFromProjectList(String projectName);
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index 3ab9878..d404ab2 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -30,10 +30,8 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import javax.net.ssl.SSLException;
import org.apache.http.HttpException;
import org.apache.http.client.ClientProtocolException;
@@ -51,6 +49,7 @@
private final Configuration cfg;
private final Provider<Set<PeerInfo>> peerInfoProvider;
private final Gson gson;
+ private final RestForwarderScheduler scheduler;
@Inject
RestForwarder(
@@ -58,21 +57,24 @@
@PluginName String pluginName,
Configuration cfg,
Provider<Set<PeerInfo>> peerInfoProvider,
- @EventGson Gson gson) {
+ @EventGson Gson gson,
+ RestForwarderScheduler scheduler) {
this.httpSession = httpClient;
this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName);
this.cfg = cfg;
this.peerInfoProvider = peerInfoProvider;
this.gson = gson;
+ this.scheduler = scheduler;
}
@Override
- public boolean indexAccount(final int accountId, IndexEvent event) {
+ public CompletableFuture<Boolean> indexAccount(final int accountId, IndexEvent event) {
return execute(RequestMethod.POST, "index account", "index/account", accountId, event);
}
@Override
- public boolean indexChange(String projectName, int changeId, IndexEvent event) {
+ public CompletableFuture<Boolean> indexChange(
+ String projectName, int changeId, IndexEvent event) {
return execute(
RequestMethod.POST,
"index change",
@@ -82,13 +84,13 @@
}
@Override
- public boolean deleteChangeFromIndex(final int changeId, IndexEvent event) {
+ public CompletableFuture<Boolean> deleteChangeFromIndex(final int changeId, IndexEvent event) {
return execute(
RequestMethod.DELETE, "delete change", "index/change", buildIndexEndpoint(changeId), event);
}
@Override
- public boolean indexGroup(final String uuid, IndexEvent event) {
+ public CompletableFuture<Boolean> indexGroup(final String uuid, IndexEvent event) {
return execute(RequestMethod.POST, "index group", "index/group", uuid, event);
}
@@ -102,24 +104,24 @@
}
@Override
- public boolean indexProject(String projectName, IndexEvent event) {
+ public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent event) {
return execute(
RequestMethod.POST, "index project", "index/project", Url.encode(projectName), event);
}
@Override
- public boolean send(final Event event) {
+ public CompletableFuture<Boolean> send(final Event event) {
return execute(RequestMethod.POST, "send event", "event", event.type, event);
}
@Override
- public boolean evict(final String cacheName, final Object key) {
+ public CompletableFuture<Boolean> evict(final String cacheName, final Object key) {
String json = gson.toJson(key);
return execute(RequestMethod.POST, "invalidate cache " + cacheName, "cache", cacheName, json);
}
@Override
- public boolean addToProjectList(String projectName) {
+ public CompletableFuture<Boolean> addToProjectList(String projectName) {
return execute(
RequestMethod.POST,
"Update project_list, add ",
@@ -128,7 +130,7 @@
}
@Override
- public boolean removeFromProjectList(String projectName) {
+ public CompletableFuture<Boolean> removeFromProjectList(String projectName) {
return execute(
RequestMethod.DELETE,
"Update project_list, remove ",
@@ -140,19 +142,19 @@
return Joiner.on("/").join("cache", Constants.PROJECT_LIST);
}
- private boolean execute(RequestMethod method, String action, String endpoint, Object id) {
+ private CompletableFuture<Boolean> execute(
+ RequestMethod method, String action, String endpoint, Object id) {
return execute(method, action, endpoint, id, null);
}
- private boolean execute(
+ private CompletableFuture<Boolean> execute(
RequestMethod method, String action, String endpoint, Object id, Object payload) {
- List<CompletableFuture<Boolean>> futures =
- peerInfoProvider.get().stream()
- .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
- .map(request -> CompletableFuture.supplyAsync(request::execute))
- .collect(Collectors.toList());
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
- return futures.stream().allMatch(CompletableFuture::join);
+ return peerInfoProvider.get().stream()
+ .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
+ .map(scheduler::execute)
+ .reduce(
+ CompletableFuture.completedFuture(true),
+ (a, b) -> a.thenCombine(b, (left, right) -> left && right));
}
private Request createRequest(
@@ -178,7 +180,7 @@
};
}
- private abstract class Request {
+ protected abstract class Request {
private final String action;
private final Object key;
private final String destination;
@@ -191,42 +193,36 @@
this.destination = destination;
}
- boolean execute() {
- log.atFine().log("Executing %s %s towards %s", action, key, destination);
- for (; ; ) {
- try {
- execCnt++;
- tryOnce();
- log.atFine().log("%s %s towards %s OK", action, key, destination);
- return true;
- } catch (ForwardingException e) {
- int maxTries = cfg.http().maxTries();
- log.atFine().withCause(e).log(
- "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
- if (!e.isRecoverable()) {
- log.atSevere().withCause(e).log(
- "%s %s towards %s failed with unrecoverable error; giving up",
- action, key, destination);
- return false;
- }
- if (execCnt >= maxTries) {
- log.atSevere().log(
- "Failed to %s %s on %s after %d tries; giving up",
- action, key, destination, maxTries);
- return false;
- }
+ @Override
+ public String toString() {
+ return String.format("%s:%s => %s (try #%d)", action, key, destination, execCnt);
+ }
- log.atFine().log("Retrying to %s %s on %s", action, key, destination);
- try {
- Thread.sleep(cfg.http().retryInterval());
- } catch (InterruptedException ie) {
- log.atSevere().withCause(ie).log(
- "%s %s towards %s was interrupted; giving up", action, key, destination);
- Thread.currentThread().interrupt();
- return false;
- }
+ boolean execute() throws ForwardingException {
+ log.atFine().log("Executing %s %s towards %s", action, key, destination);
+ try {
+ execCnt++;
+ tryOnce();
+ log.atFine().log("%s %s towards %s OK", action, key, destination);
+ return true;
+ } catch (ForwardingException e) {
+ int maxTries = cfg.http().maxTries();
+ log.atFine().withCause(e).log(
+ "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
+ if (!e.isRecoverable()) {
+ log.atSevere().withCause(e).log(
+ "%s %s towards %s failed with unrecoverable error; giving up",
+ action, key, destination);
+ throw e;
+ }
+ if (execCnt >= maxTries) {
+ log.atSevere().log(
+ "Failed to %s %s on %s after %d tries; giving up",
+ action, key, destination, maxTries);
+ throw e;
}
}
+ return false;
}
void tryOnce() throws ForwardingException {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
new file mode 100644
index 0000000..ebbd940
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+@Singleton
+public class RestForwarderScheduler {
+ private static final FluentLogger log = FluentLogger.forEnclosingClass();
+ private final ScheduledExecutorService executor;
+ private final long retryIntervalMs;
+
+ public class CompletablePromise<V> extends CompletableFuture<V> {
+ private Future<V> future;
+
+ public CompletablePromise(Future<V> future) {
+ this.future = future;
+ executor.execute(this::tryToComplete);
+ }
+
+ private void tryToComplete() {
+ if (future.isDone()) {
+ try {
+ complete(future.get());
+ } catch (InterruptedException e) {
+ completeExceptionally(e);
+ } catch (ExecutionException e) {
+ completeExceptionally(e.getCause());
+ }
+ return;
+ }
+
+ if (future.isCancelled()) {
+ cancel(true);
+ return;
+ }
+
+ executor.execute(this::tryToComplete);
+ }
+ }
+
+ @Inject
+ public RestForwarderScheduler(
+ WorkQueue workQueue, Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider) {
+ int executorSize = peerInfoProvider.get().size() * cfg.index().threadPoolSize();
+ retryIntervalMs = cfg.index().retryInterval();
+ this.executor = workQueue.createQueue(executorSize, "RestForwarderScheduler");
+ }
+
+ @VisibleForTesting
+ public RestForwarderScheduler(ScheduledExecutorService executor) {
+ this.executor = executor;
+ retryIntervalMs = 0;
+ }
+
+ public CompletableFuture<Boolean> execute(RestForwarder.Request request) {
+ return execute(request, 0);
+ }
+
+ public CompletableFuture<Boolean> execute(RestForwarder.Request request, long delayMs) {
+ return supplyAsync(
+ request.toString(),
+ () -> {
+ try {
+ if (!request.execute()) {
+ log.atWarning().log(
+ "Rescheduling %s for retry after %d msec", request, retryIntervalMs);
+ return execute(request, retryIntervalMs);
+ }
+ return CompletableFuture.completedFuture(true);
+ } catch (ForwardingException e) {
+ log.atSevere().withCause(e).log("Forwarding of %s has failed", request);
+ return CompletableFuture.completedFuture(false);
+ }
+ },
+ executor,
+ delayMs);
+ }
+
+ private CompletableFuture<Boolean> supplyAsync(
+ String taskName,
+ Supplier<CompletableFuture<Boolean>> fn,
+ ScheduledExecutorService executor,
+ long delayMs) {
+ BooleanAsyncSupplier asyncSupplier = new BooleanAsyncSupplier(taskName, fn);
+ executor.schedule(asyncSupplier, delayMs, TimeUnit.MILLISECONDS);
+ return asyncSupplier.future();
+ }
+
+ static class BooleanAsyncSupplier implements Runnable {
+ private CompletableFuture<CompletableFuture<Boolean>> dep;
+ private Supplier<CompletableFuture<Boolean>> fn;
+ private String taskName;
+
+ BooleanAsyncSupplier(String taskName, Supplier<CompletableFuture<Boolean>> fn) {
+ this.taskName = taskName;
+ this.dep = new CompletableFuture<>();
+ this.fn = fn;
+ }
+
+ public CompletableFuture<Boolean> future() {
+ return dep.thenCompose(Function.identity());
+ }
+
+ @Override
+ public void run() {
+ try {
+ dep.complete(fn.get());
+ } catch (Throwable ex) {
+ dep.completeExceptionally(ex);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return taskName;
+ }
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index a352e38..b311db9 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -28,6 +28,7 @@
import com.google.inject.Inject;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@
this,
() -> {
queuedTasks.remove(this);
- execute();
+ return execute();
},
this::reschedule);
}
@@ -169,7 +170,7 @@
}
}
- abstract void execute();
+ abstract CompletableFuture<Boolean> execute();
}
class IndexChangeTask extends IndexTask {
@@ -183,8 +184,8 @@
}
@Override
- public void execute() {
- forwarder.indexChange(projectName, changeId, indexEvent);
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.indexChange(projectName, changeId, indexEvent);
}
@Override
@@ -216,8 +217,8 @@
}
@Override
- public void execute() {
- forwarder.deleteChangeFromIndex(changeId, indexEvent);
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.deleteChangeFromIndex(changeId, indexEvent);
}
@Override
@@ -248,8 +249,8 @@
}
@Override
- public void execute() {
- forwarder.indexAccount(accountId, indexEvent);
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.indexAccount(accountId, indexEvent);
}
@Override
@@ -280,8 +281,8 @@
}
@Override
- public void execute() {
- forwarder.indexGroup(groupUUID, indexEvent);
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.indexGroup(groupUUID, indexEvent);
}
@Override
@@ -312,8 +313,8 @@
}
@Override
- public void execute() {
- forwarder.indexProject(projectName, indexEvent);
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.indexProject(projectName, indexEvent);
}
@Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
index 4617e08..4434c34 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -20,6 +20,7 @@
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -38,15 +39,16 @@
}
public void withLock(
- IndexTask id, VoidFunction function, VoidFunction lockAcquireTimeoutCallback) {
+ IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
Lock idLock = getLock(id);
try {
if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
- try {
- function.invoke();
- } finally {
- idLock.unlock();
- }
+ function
+ .invoke()
+ .whenComplete(
+ (result, error) -> {
+ idLock.unlock();
+ });
} else {
lockAcquireTimeoutCallback.invoke();
}
@@ -64,4 +66,9 @@
public interface VoidFunction {
void invoke();
}
+
+ @FunctionalInterface
+ public interface IndexCallFunction {
+ CompletableFuture<?> invoke();
+ }
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index 9d0d03d..4a25fc8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -36,7 +36,9 @@
.toProvider(ForwardedIndexExecutorProvider.class);
bind(IndexEventLocks.class).in(Scopes.SINGLETON);
listener().to(IndexExecutorProvider.class);
- DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
+ DynamicSet.bind(binder(), ChangeIndexedListener.class)
+ .to(IndexEventHandler.class)
+ .in(Scopes.SINGLETON);
DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
DynamicSet.bind(binder(), ProjectIndexedListener.class).to(IndexEventHandler.class);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 40ef88e..046d6ad 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -37,6 +37,8 @@
import com.google.inject.Provider;
import java.io.IOException;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.junit.Before;
import org.junit.Test;
@@ -80,121 +82,170 @@
private static final String EVENT_ENDPOINT =
Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "event", event.type);
+ private static final long TEST_TIMEOUT = 10;
+ private static final TimeUnit TEST_TIMEOUT_UNITS = TimeUnit.SECONDS;
+
private RestForwarder forwarder;
private HttpSession httpSessionMock;
private Gson gson = new Gson();
+ private Configuration configMock;
+ Provider<Set<PeerInfo>> peersMock;
@SuppressWarnings("unchecked")
@Before
public void setUp() {
httpSessionMock = mock(HttpSession.class);
- Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
+ configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
when(configMock.http().maxTries()).thenReturn(3);
when(configMock.http().retryInterval()).thenReturn(10);
- Provider<Set<PeerInfo>> peersMock = mock(Provider.class);
+ peersMock = mock(Provider.class);
when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL)));
forwarder =
new RestForwarder(
- httpSessionMock, PLUGIN_NAME, configMock, peersMock, gson); // TODO: Create provider
+ httpSessionMock,
+ PLUGIN_NAME,
+ configMock,
+ peersMock,
+ gson, // TODO: Create provider
+ new RestForwarderScheduler(Executors.newScheduledThreadPool(1)));
}
@Test
public void testIndexAccountOK() throws Exception {
when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isTrue();
+ assertThat(
+ forwarder
+ .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
public void testIndexAccountFailed() throws Exception {
when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testIndexAccountThrowsException() throws Exception {
doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_ACCOUNT_ENDPOINT), any());
- assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testIndexGroupOK() throws Exception {
when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isTrue();
+ assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
public void testIndexGroupFailed() throws Exception {
when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+ assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testIndexGroupThrowsException() throws Exception {
doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_GROUP_ENDPOINT), any());
- assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+ assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testIndexChangeOK() throws Exception {
when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isTrue();
+ assertThat(
+ forwarder
+ .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
public void testIndexChangeFailed() throws Exception {
when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testIndexChangeThrowsException() throws Exception {
doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_CHANGE_ENDPOINT), any());
- assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testChangeDeletedFromIndexOK() throws Exception {
when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isTrue();
+ assertThat(
+ forwarder
+ .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
public void testChangeDeletedFromIndexFailed() throws Exception {
when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testChangeDeletedFromThrowsException() throws Exception {
doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT));
- assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+ assertThat(
+ forwarder
+ .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
public void testEventSentOK() throws Exception {
when(httpSessionMock.post(EVENT_ENDPOINT, event))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.send(event)).isTrue();
+ assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isTrue();
}
@Test
public void testEventSentFailed() throws Exception {
when(httpSessionMock.post(EVENT_ENDPOINT, event)).thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.send(event)).isFalse();
+ assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
}
@Test
public void testEventSentThrowsException() throws Exception {
doThrow(new IOException()).when(httpSessionMock).post(EVENT_ENDPOINT, event);
- assertThat(forwarder.send(event)).isFalse();
+ assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
}
@Test
@@ -203,7 +254,8 @@
String keyJson = gson.toJson(key);
when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.PROJECTS, key)).isTrue();
+ assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -212,7 +264,8 @@
String keyJson = gson.toJson(key);
when(httpSessionMock.post(buildCacheEndpoint(Constants.ACCOUNTS), keyJson))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.ACCOUNTS, key)).isTrue();
+ assertThat(forwarder.evict(Constants.ACCOUNTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -221,7 +274,8 @@
String keyJson = gson.toJson(key);
String endpoint = buildCacheEndpoint(Constants.GROUPS);
when(httpSessionMock.post(endpoint, keyJson)).thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.GROUPS, key)).isTrue();
+ assertThat(forwarder.evict(Constants.GROUPS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -230,7 +284,9 @@
String keyJson = gson.toJson(key);
when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_BYINCLUDE), keyJson))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.GROUPS_BYINCLUDE, key)).isTrue();
+ assertThat(
+ forwarder.evict(Constants.GROUPS_BYINCLUDE, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -239,7 +295,8 @@
String keyJson = gson.toJson(key);
when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_MEMBERS), keyJson))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key)).isTrue();
+ assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -248,7 +305,8 @@
String keyJson = gson.toJson(key);
when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+ assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
@@ -258,7 +316,8 @@
doThrow(new IOException())
.when(httpSessionMock)
.post(buildCacheEndpoint(Constants.PROJECTS), keyJson);
- assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+ assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
private static String buildCacheEndpoint(String name) {
@@ -270,7 +329,8 @@
String projectName = PROJECT_TO_ADD;
when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.addToProjectList(projectName)).isTrue();
+ assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -278,7 +338,8 @@
String projectName = PROJECT_TO_ADD;
when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.addToProjectList(projectName)).isFalse();
+ assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
@@ -287,7 +348,8 @@
doThrow(new IOException())
.when(httpSessionMock)
.post(buildProjectListCacheEndpoint(projectName), null);
- assertThat(forwarder.addToProjectList(projectName)).isFalse();
+ assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
@@ -295,7 +357,8 @@
String projectName = PROJECT_TO_DELETE;
when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
- assertThat(forwarder.removeFromProjectList(projectName)).isTrue();
+ assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
@@ -303,7 +366,8 @@
String projectName = PROJECT_TO_DELETE;
when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
.thenReturn(new HttpResult(FAILED, EMPTY_MSG));
- assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+ assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
@@ -312,7 +376,8 @@
doThrow(new IOException())
.when(httpSessionMock)
.delete((buildProjectListCacheEndpoint(projectName)));
- assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+ assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
private static String buildProjectListCacheEndpoint(String projectName) {
@@ -320,41 +385,57 @@
}
@Test
- public void testRetryOnErrorThenSuccess() throws IOException {
+ public void testRetryOnErrorThenSuccess() throws Exception {
when(httpSessionMock.post(anyString(), anyString()))
.thenReturn(new HttpResult(false, ERROR))
.thenReturn(new HttpResult(false, ERROR))
.thenReturn(new HttpResult(true, SUCCESS));
- assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+ assertThat(
+ forwarder
+ .evict(Constants.PROJECT_LIST, new Object())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
- public void testRetryOnIoExceptionThenSuccess() throws IOException {
+ public void testRetryOnIoExceptionThenSuccess() throws Exception {
when(httpSessionMock.post(anyString(), anyString()))
.thenThrow(new IOException())
.thenThrow(new IOException())
.thenReturn(new HttpResult(true, SUCCESS));
- assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+ assertThat(
+ forwarder
+ .evict(Constants.PROJECT_LIST, new Object())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isTrue();
}
@Test
- public void testNoRetryAfterNonRecoverableException() throws IOException {
+ public void testNoRetryAfterNonRecoverableException() throws Exception {
when(httpSessionMock.post(anyString(), anyString()))
.thenThrow(new SSLException("Non Recoverable"))
.thenReturn(new HttpResult(true, SUCCESS));
- assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+ assertThat(
+ forwarder
+ .evict(Constants.PROJECT_LIST, new Object())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
@Test
- public void testFailureAfterMaxTries() throws IOException {
+ public void testFailureAfterMaxTries() throws Exception {
when(httpSessionMock.post(anyString(), anyString()))
.thenReturn(new HttpResult(false, ERROR))
.thenReturn(new HttpResult(false, ERROR))
.thenReturn(new HttpResult(false, ERROR));
- assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+ assertThat(
+ forwarder
+ .evict(Constants.PROJECT_LIST, new Object())
+ .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+ .isFalse();
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 9d8f015..db4c613 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -43,6 +43,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -105,6 +106,13 @@
when(configuration.http()).thenReturn(http);
when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
+ when(forwarder.indexAccount(eq(ACCOUNT_ID), any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+ when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+ when(forwarder.indexGroup(eq(UUID), any())).thenReturn(CompletableFuture.completedFuture(true));
+ when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any()))
+ .thenReturn(CompletableFuture.completedFuture(true));
idLocks = new IndexEventLocks(configuration);
setUpIndexEventHandler(currCtx);