Merge branch 'stable-3.1' into stable-3.2

* stable-3.1:
  Stop occupying threads during retry phase
  Fix issue with indexing tasks queue

Change-Id: Ifba0ce701eb1b32385d3eb6783712a71ffca9264
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
index 49cdc8b..bb47f11 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
@@ -15,6 +15,7 @@
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
 import com.google.gerrit.server.events.Event;
+import java.util.concurrent.CompletableFuture;
 
 /** Forward indexing, stream events and cache evictions to the other master */
 public interface Forwarder {
@@ -24,9 +25,10 @@
    *
    * @param accountId the account to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexAccount(int accountId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent);
 
   /**
    * Forward a change indexing event to the other master.
@@ -34,67 +36,75 @@
    * @param projectName the project of the change to index.
    * @param changeId the change to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexChange(String projectName, int changeId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexChange(String projectName, int changeId, IndexEvent indexEvent);
 
   /**
    * Forward a delete change from index event to the other master.
    *
    * @param changeId the change to remove from the index.
    * @param indexEvent the details of the index event.
-   * @return rue if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
 
   /**
    * Forward a group indexing event to the other master.
    *
    * @param uuid the group to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexGroup(String uuid, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent);
 
   /**
    * Forward a project indexing event to the other master.
    *
    * @param projectName the project to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexProject(String projectName, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent);
 
   /**
    * Forward a stream event to the other master.
    *
    * @param event the event to forward.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean send(Event event);
+  CompletableFuture<Boolean> send(Event event);
 
   /**
    * Forward a cache eviction event to the other master.
    *
    * @param cacheName the name of the cache to evict an entry from.
    * @param key the key identifying the entry to evict from the cache.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean evict(String cacheName, Object key);
+  CompletableFuture<Boolean> evict(String cacheName, Object key);
 
   /**
    * Forward an addition to the project list cache to the other master.
    *
    * @param projectName the name of the project to add to the project list cache
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean addToProjectList(String projectName);
+  CompletableFuture<Boolean> addToProjectList(String projectName);
 
   /**
    * Forward a removal from the project list cache to the other master.
    *
    * @param projectName the name of the project to remove from the project list cache
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean removeFromProjectList(String projectName);
+  CompletableFuture<Boolean> removeFromProjectList(String projectName);
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index 3ab9878..d404ab2 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -30,10 +30,8 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import java.io.IOException;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import javax.net.ssl.SSLException;
 import org.apache.http.HttpException;
 import org.apache.http.client.ClientProtocolException;
@@ -51,6 +49,7 @@
   private final Configuration cfg;
   private final Provider<Set<PeerInfo>> peerInfoProvider;
   private final Gson gson;
+  private final RestForwarderScheduler scheduler;
 
   @Inject
   RestForwarder(
@@ -58,21 +57,24 @@
       @PluginName String pluginName,
       Configuration cfg,
       Provider<Set<PeerInfo>> peerInfoProvider,
-      @EventGson Gson gson) {
+      @EventGson Gson gson,
+      RestForwarderScheduler scheduler) {
     this.httpSession = httpClient;
     this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName);
     this.cfg = cfg;
     this.peerInfoProvider = peerInfoProvider;
     this.gson = gson;
+    this.scheduler = scheduler;
   }
 
   @Override
-  public boolean indexAccount(final int accountId, IndexEvent event) {
+  public CompletableFuture<Boolean> indexAccount(final int accountId, IndexEvent event) {
     return execute(RequestMethod.POST, "index account", "index/account", accountId, event);
   }
 
   @Override
-  public boolean indexChange(String projectName, int changeId, IndexEvent event) {
+  public CompletableFuture<Boolean> indexChange(
+      String projectName, int changeId, IndexEvent event) {
     return execute(
         RequestMethod.POST,
         "index change",
@@ -82,13 +84,13 @@
   }
 
   @Override
-  public boolean deleteChangeFromIndex(final int changeId, IndexEvent event) {
+  public CompletableFuture<Boolean> deleteChangeFromIndex(final int changeId, IndexEvent event) {
     return execute(
         RequestMethod.DELETE, "delete change", "index/change", buildIndexEndpoint(changeId), event);
   }
 
   @Override
-  public boolean indexGroup(final String uuid, IndexEvent event) {
+  public CompletableFuture<Boolean> indexGroup(final String uuid, IndexEvent event) {
     return execute(RequestMethod.POST, "index group", "index/group", uuid, event);
   }
 
@@ -102,24 +104,24 @@
   }
 
   @Override
-  public boolean indexProject(String projectName, IndexEvent event) {
+  public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent event) {
     return execute(
         RequestMethod.POST, "index project", "index/project", Url.encode(projectName), event);
   }
 
   @Override
-  public boolean send(final Event event) {
+  public CompletableFuture<Boolean> send(final Event event) {
     return execute(RequestMethod.POST, "send event", "event", event.type, event);
   }
 
   @Override
-  public boolean evict(final String cacheName, final Object key) {
+  public CompletableFuture<Boolean> evict(final String cacheName, final Object key) {
     String json = gson.toJson(key);
     return execute(RequestMethod.POST, "invalidate cache " + cacheName, "cache", cacheName, json);
   }
 
   @Override
-  public boolean addToProjectList(String projectName) {
+  public CompletableFuture<Boolean> addToProjectList(String projectName) {
     return execute(
         RequestMethod.POST,
         "Update project_list, add ",
@@ -128,7 +130,7 @@
   }
 
   @Override
-  public boolean removeFromProjectList(String projectName) {
+  public CompletableFuture<Boolean> removeFromProjectList(String projectName) {
     return execute(
         RequestMethod.DELETE,
         "Update project_list, remove ",
@@ -140,19 +142,19 @@
     return Joiner.on("/").join("cache", Constants.PROJECT_LIST);
   }
 
-  private boolean execute(RequestMethod method, String action, String endpoint, Object id) {
+  private CompletableFuture<Boolean> execute(
+      RequestMethod method, String action, String endpoint, Object id) {
     return execute(method, action, endpoint, id, null);
   }
 
-  private boolean execute(
+  private CompletableFuture<Boolean> execute(
       RequestMethod method, String action, String endpoint, Object id, Object payload) {
-    List<CompletableFuture<Boolean>> futures =
-        peerInfoProvider.get().stream()
-            .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
-            .map(request -> CompletableFuture.supplyAsync(request::execute))
-            .collect(Collectors.toList());
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
-    return futures.stream().allMatch(CompletableFuture::join);
+    return peerInfoProvider.get().stream()
+        .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
+        .map(scheduler::execute)
+        .reduce(
+            CompletableFuture.completedFuture(true),
+            (a, b) -> a.thenCombine(b, (left, right) -> left && right));
   }
 
   private Request createRequest(
@@ -178,7 +180,7 @@
     };
   }
 
-  private abstract class Request {
+  protected abstract class Request {
     private final String action;
     private final Object key;
     private final String destination;
@@ -191,42 +193,36 @@
       this.destination = destination;
     }
 
-    boolean execute() {
-      log.atFine().log("Executing %s %s towards %s", action, key, destination);
-      for (; ; ) {
-        try {
-          execCnt++;
-          tryOnce();
-          log.atFine().log("%s %s towards %s OK", action, key, destination);
-          return true;
-        } catch (ForwardingException e) {
-          int maxTries = cfg.http().maxTries();
-          log.atFine().withCause(e).log(
-              "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
-          if (!e.isRecoverable()) {
-            log.atSevere().withCause(e).log(
-                "%s %s towards %s failed with unrecoverable error; giving up",
-                action, key, destination);
-            return false;
-          }
-          if (execCnt >= maxTries) {
-            log.atSevere().log(
-                "Failed to %s %s on %s after %d tries; giving up",
-                action, key, destination, maxTries);
-            return false;
-          }
+    @Override
+    public String toString() {
+      return String.format("%s:%s => %s (try #%d)", action, key, destination, execCnt);
+    }
 
-          log.atFine().log("Retrying to %s %s on %s", action, key, destination);
-          try {
-            Thread.sleep(cfg.http().retryInterval());
-          } catch (InterruptedException ie) {
-            log.atSevere().withCause(ie).log(
-                "%s %s towards %s was interrupted; giving up", action, key, destination);
-            Thread.currentThread().interrupt();
-            return false;
-          }
+    boolean execute() throws ForwardingException {
+      log.atFine().log("Executing %s %s towards %s", action, key, destination);
+      try {
+        execCnt++;
+        tryOnce();
+        log.atFine().log("%s %s towards %s OK", action, key, destination);
+        return true;
+      } catch (ForwardingException e) {
+        int maxTries = cfg.http().maxTries();
+        log.atFine().withCause(e).log(
+            "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
+        if (!e.isRecoverable()) {
+          log.atSevere().withCause(e).log(
+              "%s %s towards %s failed with unrecoverable error; giving up",
+              action, key, destination);
+          throw e;
+        }
+        if (execCnt >= maxTries) {
+          log.atSevere().log(
+              "Failed to %s %s on %s after %d tries; giving up",
+              action, key, destination, maxTries);
+          throw e;
         }
       }
+      return false;
     }
 
     void tryOnce() throws ForwardingException {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
new file mode 100644
index 0000000..ebbd940
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+@Singleton
+public class RestForwarderScheduler {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+  private final ScheduledExecutorService executor;
+  private final long retryIntervalMs;
+
+  public class CompletablePromise<V> extends CompletableFuture<V> {
+    private Future<V> future;
+
+    public CompletablePromise(Future<V> future) {
+      this.future = future;
+      executor.execute(this::tryToComplete);
+    }
+
+    private void tryToComplete() {
+      if (future.isDone()) {
+        try {
+          complete(future.get());
+        } catch (InterruptedException e) {
+          completeExceptionally(e);
+        } catch (ExecutionException e) {
+          completeExceptionally(e.getCause());
+        }
+        return;
+      }
+
+      if (future.isCancelled()) {
+        cancel(true);
+        return;
+      }
+
+      executor.execute(this::tryToComplete);
+    }
+  }
+
+  @Inject
+  public RestForwarderScheduler(
+      WorkQueue workQueue, Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider) {
+    int executorSize = peerInfoProvider.get().size() * cfg.index().threadPoolSize();
+    retryIntervalMs = cfg.index().retryInterval();
+    this.executor = workQueue.createQueue(executorSize, "RestForwarderScheduler");
+  }
+
+  @VisibleForTesting
+  public RestForwarderScheduler(ScheduledExecutorService executor) {
+    this.executor = executor;
+    retryIntervalMs = 0;
+  }
+
+  public CompletableFuture<Boolean> execute(RestForwarder.Request request) {
+    return execute(request, 0);
+  }
+
+  public CompletableFuture<Boolean> execute(RestForwarder.Request request, long delayMs) {
+    return supplyAsync(
+        request.toString(),
+        () -> {
+          try {
+            if (!request.execute()) {
+              log.atWarning().log(
+                  "Rescheduling %s for retry after %d msec", request, retryIntervalMs);
+              return execute(request, retryIntervalMs);
+            }
+            return CompletableFuture.completedFuture(true);
+          } catch (ForwardingException e) {
+            log.atSevere().withCause(e).log("Forwarding of %s has failed", request);
+            return CompletableFuture.completedFuture(false);
+          }
+        },
+        executor,
+        delayMs);
+  }
+
+  private CompletableFuture<Boolean> supplyAsync(
+      String taskName,
+      Supplier<CompletableFuture<Boolean>> fn,
+      ScheduledExecutorService executor,
+      long delayMs) {
+    BooleanAsyncSupplier asyncSupplier = new BooleanAsyncSupplier(taskName, fn);
+    executor.schedule(asyncSupplier, delayMs, TimeUnit.MILLISECONDS);
+    return asyncSupplier.future();
+  }
+
+  static class BooleanAsyncSupplier implements Runnable {
+    private CompletableFuture<CompletableFuture<Boolean>> dep;
+    private Supplier<CompletableFuture<Boolean>> fn;
+    private String taskName;
+
+    BooleanAsyncSupplier(String taskName, Supplier<CompletableFuture<Boolean>> fn) {
+      this.taskName = taskName;
+      this.dep = new CompletableFuture<>();
+      this.fn = fn;
+    }
+
+    public CompletableFuture<Boolean> future() {
+      return dep.thenCompose(Function.identity());
+    }
+
+    @Override
+    public void run() {
+      try {
+        dep.complete(fn.get());
+      } catch (Throwable ex) {
+        dep.completeExceptionally(ex);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return taskName;
+    }
+  }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index a352e38..b311db9 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -28,6 +28,7 @@
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@
           this,
           () -> {
             queuedTasks.remove(this);
-            execute();
+            return execute();
           },
           this::reschedule);
     }
@@ -169,7 +170,7 @@
       }
     }
 
-    abstract void execute();
+    abstract CompletableFuture<Boolean> execute();
   }
 
   class IndexChangeTask extends IndexTask {
@@ -183,8 +184,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexChange(projectName, changeId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexChange(projectName, changeId, indexEvent);
     }
 
     @Override
@@ -216,8 +217,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.deleteChangeFromIndex(changeId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.deleteChangeFromIndex(changeId, indexEvent);
     }
 
     @Override
@@ -248,8 +249,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexAccount(accountId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexAccount(accountId, indexEvent);
     }
 
     @Override
@@ -280,8 +281,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexGroup(groupUUID, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexGroup(groupUUID, indexEvent);
     }
 
     @Override
@@ -312,8 +313,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexProject(projectName, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexProject(projectName, indexEvent);
     }
 
     @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
index 4617e08..4434c34 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -20,6 +20,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
@@ -38,15 +39,16 @@
   }
 
   public void withLock(
-      IndexTask id, VoidFunction function, VoidFunction lockAcquireTimeoutCallback) {
+      IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
     Lock idLock = getLock(id);
     try {
       if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
-        try {
-          function.invoke();
-        } finally {
-          idLock.unlock();
-        }
+        function
+            .invoke()
+            .whenComplete(
+                (result, error) -> {
+                  idLock.unlock();
+                });
       } else {
         lockAcquireTimeoutCallback.invoke();
       }
@@ -64,4 +66,9 @@
   public interface VoidFunction {
     void invoke();
   }
+
+  @FunctionalInterface
+  public interface IndexCallFunction {
+    CompletableFuture<?> invoke();
+  }
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index 9d0d03d..4a25fc8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -36,7 +36,9 @@
         .toProvider(ForwardedIndexExecutorProvider.class);
     bind(IndexEventLocks.class).in(Scopes.SINGLETON);
     listener().to(IndexExecutorProvider.class);
-    DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
+    DynamicSet.bind(binder(), ChangeIndexedListener.class)
+        .to(IndexEventHandler.class)
+        .in(Scopes.SINGLETON);
     DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), ProjectIndexedListener.class).to(IndexEventHandler.class);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 40ef88e..046d6ad 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -37,6 +37,8 @@
 import com.google.inject.Provider;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import org.junit.Before;
 import org.junit.Test;
@@ -80,121 +82,170 @@
   private static final String EVENT_ENDPOINT =
       Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "event", event.type);
 
+  private static final long TEST_TIMEOUT = 10;
+  private static final TimeUnit TEST_TIMEOUT_UNITS = TimeUnit.SECONDS;
+
   private RestForwarder forwarder;
   private HttpSession httpSessionMock;
   private Gson gson = new Gson();
+  private Configuration configMock;
+  Provider<Set<PeerInfo>> peersMock;
 
   @SuppressWarnings("unchecked")
   @Before
   public void setUp() {
     httpSessionMock = mock(HttpSession.class);
-    Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
+    configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
     when(configMock.http().maxTries()).thenReturn(3);
     when(configMock.http().retryInterval()).thenReturn(10);
-    Provider<Set<PeerInfo>> peersMock = mock(Provider.class);
+    peersMock = mock(Provider.class);
     when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL)));
     forwarder =
         new RestForwarder(
-            httpSessionMock, PLUGIN_NAME, configMock, peersMock, gson); // TODO: Create provider
+            httpSessionMock,
+            PLUGIN_NAME,
+            configMock,
+            peersMock,
+            gson, // TODO: Create provider
+            new RestForwarderScheduler(Executors.newScheduledThreadPool(1)));
   }
 
   @Test
   public void testIndexAccountOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexAccountFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexAccountThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_ACCOUNT_ENDPOINT), any());
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexGroupOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isTrue();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexGroupFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexGroupThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_GROUP_ENDPOINT), any());
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexChangeOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexChangeFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexChangeThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_CHANGE_ENDPOINT), any());
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testChangeDeletedFromIndexOK() throws Exception {
     when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testChangeDeletedFromIndexFailed() throws Exception {
     when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testChangeDeletedFromThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testEventSentOK() throws Exception {
     when(httpSessionMock.post(EVENT_ENDPOINT, event))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.send(event)).isTrue();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isTrue();
   }
 
   @Test
   public void testEventSentFailed() throws Exception {
     when(httpSessionMock.post(EVENT_ENDPOINT, event)).thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.send(event)).isFalse();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
   }
 
   @Test
   public void testEventSentThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(EVENT_ENDPOINT, event);
-    assertThat(forwarder.send(event)).isFalse();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
   }
 
   @Test
@@ -203,7 +254,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -212,7 +264,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.ACCOUNTS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.ACCOUNTS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.ACCOUNTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -221,7 +274,8 @@
     String keyJson = gson.toJson(key);
     String endpoint = buildCacheEndpoint(Constants.GROUPS);
     when(httpSessionMock.post(endpoint, keyJson)).thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.GROUPS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -230,7 +284,9 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_BYINCLUDE), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS_BYINCLUDE, key)).isTrue();
+    assertThat(
+            forwarder.evict(Constants.GROUPS_BYINCLUDE, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -239,7 +295,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_MEMBERS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -248,7 +305,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -258,7 +316,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .post(buildCacheEndpoint(Constants.PROJECTS), keyJson);
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   private static String buildCacheEndpoint(String name) {
@@ -270,7 +329,8 @@
     String projectName = PROJECT_TO_ADD;
     when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.addToProjectList(projectName)).isTrue();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -278,7 +338,8 @@
     String projectName = PROJECT_TO_ADD;
     when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.addToProjectList(projectName)).isFalse();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -287,7 +348,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .post(buildProjectListCacheEndpoint(projectName), null);
-    assertThat(forwarder.addToProjectList(projectName)).isFalse();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -295,7 +357,8 @@
     String projectName = PROJECT_TO_DELETE;
     when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.removeFromProjectList(projectName)).isTrue();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -303,7 +366,8 @@
     String projectName = PROJECT_TO_DELETE;
     when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -312,7 +376,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .delete((buildProjectListCacheEndpoint(projectName)));
-    assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   private static String buildProjectListCacheEndpoint(String projectName) {
@@ -320,41 +385,57 @@
   }
 
   @Test
-  public void testRetryOnErrorThenSuccess() throws IOException {
+  public void testRetryOnErrorThenSuccess() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
-  public void testRetryOnIoExceptionThenSuccess() throws IOException {
+  public void testRetryOnIoExceptionThenSuccess() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenThrow(new IOException())
         .thenThrow(new IOException())
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
-  public void testNoRetryAfterNonRecoverableException() throws IOException {
+  public void testNoRetryAfterNonRecoverableException() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenThrow(new SSLException("Non Recoverable"))
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
-  public void testFailureAfterMaxTries() throws IOException {
+  public void testFailureAfterMaxTries() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 9d8f015..db4c613 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -43,6 +43,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -105,6 +106,13 @@
     when(configuration.http()).thenReturn(http);
     when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
     when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
+    when(forwarder.indexAccount(eq(ACCOUNT_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.indexGroup(eq(UUID), any())).thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
 
     idLocks = new IndexEventLocks(configuration);
     setUpIndexEventHandler(currCtx);