Stop occupying threads during retry phase

Before this change, the retry mechanism was implemented in the
way that instead of having a queue with an async executor,
there is a loop with sleep which causes the current thread to block.
This causes threads starvation on the forwarder site and brings down
the Gerrit node. Removing the loop and rescheduling the entire task
allows to unblock the thread and queue the retry which solves the
starvation issue.

Bug: Issue 9941
Change-Id: Ie0fd642a6c1150ea9daa02e9cc346043d0c5bc7f
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
index 49cdc8b..bb47f11 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
@@ -15,6 +15,7 @@
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
 import com.google.gerrit.server.events.Event;
+import java.util.concurrent.CompletableFuture;
 
 /** Forward indexing, stream events and cache evictions to the other master */
 public interface Forwarder {
@@ -24,9 +25,10 @@
    *
    * @param accountId the account to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexAccount(int accountId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent);
 
   /**
    * Forward a change indexing event to the other master.
@@ -34,67 +36,75 @@
    * @param projectName the project of the change to index.
    * @param changeId the change to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexChange(String projectName, int changeId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexChange(String projectName, int changeId, IndexEvent indexEvent);
 
   /**
    * Forward a delete change from index event to the other master.
    *
    * @param changeId the change to remove from the index.
    * @param indexEvent the details of the index event.
-   * @return rue if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
+  CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent);
 
   /**
    * Forward a group indexing event to the other master.
    *
    * @param uuid the group to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexGroup(String uuid, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent);
 
   /**
    * Forward a project indexing event to the other master.
    *
    * @param projectName the project to index.
    * @param indexEvent the details of the index event.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean indexProject(String projectName, IndexEvent indexEvent);
+  CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent);
 
   /**
    * Forward a stream event to the other master.
    *
    * @param event the event to forward.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean send(Event event);
+  CompletableFuture<Boolean> send(Event event);
 
   /**
    * Forward a cache eviction event to the other master.
    *
    * @param cacheName the name of the cache to evict an entry from.
    * @param key the key identifying the entry to evict from the cache.
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean evict(String cacheName, Object key);
+  CompletableFuture<Boolean> evict(String cacheName, Object key);
 
   /**
    * Forward an addition to the project list cache to the other master.
    *
    * @param projectName the name of the project to add to the project list cache
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean addToProjectList(String projectName);
+  CompletableFuture<Boolean> addToProjectList(String projectName);
 
   /**
    * Forward a removal from the project list cache to the other master.
    *
    * @param projectName the name of the project to remove from the project list cache
-   * @return true if successful, otherwise false.
+   * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+   *     false.
    */
-  boolean removeFromProjectList(String projectName);
+  CompletableFuture<Boolean> removeFromProjectList(String projectName);
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index fb3aba9..ef85df8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -28,10 +28,8 @@
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import java.io.IOException;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import javax.net.ssl.SSLException;
 import org.apache.http.HttpException;
 import org.apache.http.client.ClientProtocolException;
@@ -49,6 +47,7 @@
   private final Configuration cfg;
   private final Provider<Set<PeerInfo>> peerInfoProvider;
   private final GsonProvider gson;
+  private final RestForwarderScheduler scheduler;
 
   @Inject
   RestForwarder(
@@ -56,21 +55,24 @@
       @PluginName String pluginName,
       Configuration cfg,
       Provider<Set<PeerInfo>> peerInfoProvider,
-      GsonProvider gson) {
+      GsonProvider gson,
+      RestForwarderScheduler scheduler) {
     this.httpSession = httpClient;
     this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName);
     this.cfg = cfg;
     this.peerInfoProvider = peerInfoProvider;
     this.gson = gson;
+    this.scheduler = scheduler;
   }
 
   @Override
-  public boolean indexAccount(final int accountId, IndexEvent event) {
+  public CompletableFuture<Boolean> indexAccount(final int accountId, IndexEvent event) {
     return execute(RequestMethod.POST, "index account", "index/account", accountId, event);
   }
 
   @Override
-  public boolean indexChange(String projectName, int changeId, IndexEvent event) {
+  public CompletableFuture<Boolean> indexChange(
+      String projectName, int changeId, IndexEvent event) {
     return execute(
         RequestMethod.POST,
         "index change",
@@ -80,13 +82,13 @@
   }
 
   @Override
-  public boolean deleteChangeFromIndex(final int changeId, IndexEvent event) {
+  public CompletableFuture<Boolean> deleteChangeFromIndex(final int changeId, IndexEvent event) {
     return execute(
         RequestMethod.DELETE, "delete change", "index/change", buildIndexEndpoint(changeId), event);
   }
 
   @Override
-  public boolean indexGroup(final String uuid, IndexEvent event) {
+  public CompletableFuture<Boolean> indexGroup(final String uuid, IndexEvent event) {
     return execute(RequestMethod.POST, "index group", "index/group", uuid, event);
   }
 
@@ -100,24 +102,24 @@
   }
 
   @Override
-  public boolean indexProject(String projectName, IndexEvent event) {
+  public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent event) {
     return execute(
         RequestMethod.POST, "index project", "index/project", Url.encode(projectName), event);
   }
 
   @Override
-  public boolean send(final Event event) {
+  public CompletableFuture<Boolean> send(final Event event) {
     return execute(RequestMethod.POST, "send event", "event", event.type, event);
   }
 
   @Override
-  public boolean evict(final String cacheName, final Object key) {
+  public CompletableFuture<Boolean> evict(final String cacheName, final Object key) {
     String json = gson.get().toJson(key);
     return execute(RequestMethod.POST, "invalidate cache " + cacheName, "cache", cacheName, json);
   }
 
   @Override
-  public boolean addToProjectList(String projectName) {
+  public CompletableFuture<Boolean> addToProjectList(String projectName) {
     return execute(
         RequestMethod.POST,
         "Update project_list, add ",
@@ -126,7 +128,7 @@
   }
 
   @Override
-  public boolean removeFromProjectList(String projectName) {
+  public CompletableFuture<Boolean> removeFromProjectList(String projectName) {
     return execute(
         RequestMethod.DELETE,
         "Update project_list, remove ",
@@ -138,19 +140,19 @@
     return Joiner.on("/").join("cache", Constants.PROJECT_LIST);
   }
 
-  private boolean execute(RequestMethod method, String action, String endpoint, Object id) {
+  private CompletableFuture<Boolean> execute(
+      RequestMethod method, String action, String endpoint, Object id) {
     return execute(method, action, endpoint, id, null);
   }
 
-  private boolean execute(
+  private CompletableFuture<Boolean> execute(
       RequestMethod method, String action, String endpoint, Object id, Object payload) {
-    List<CompletableFuture<Boolean>> futures =
-        peerInfoProvider.get().stream()
-            .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
-            .map(request -> CompletableFuture.supplyAsync(request::execute))
-            .collect(Collectors.toList());
-    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
-    return futures.stream().allMatch(CompletableFuture::join);
+    return peerInfoProvider.get().stream()
+        .map(peer -> createRequest(method, peer, action, endpoint, id, payload))
+        .map(scheduler::execute)
+        .reduce(
+            CompletableFuture.completedFuture(true),
+            (a, b) -> a.thenCombine(b, (left, right) -> left && right));
   }
 
   private Request createRequest(
@@ -176,7 +178,7 @@
     };
   }
 
-  private abstract class Request {
+  protected abstract class Request {
     private final String action;
     private final Object key;
     private final String destination;
@@ -189,42 +191,36 @@
       this.destination = destination;
     }
 
-    boolean execute() {
-      log.atFine().log("Executing %s %s towards %s", action, key, destination);
-      for (; ; ) {
-        try {
-          execCnt++;
-          tryOnce();
-          log.atFine().log("%s %s towards %s OK", action, key, destination);
-          return true;
-        } catch (ForwardingException e) {
-          int maxTries = cfg.http().maxTries();
-          log.atFine().withCause(e).log(
-              "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
-          if (!e.isRecoverable()) {
-            log.atSevere().withCause(e).log(
-                "%s %s towards %s failed with unrecoverable error; giving up",
-                action, key, destination);
-            return false;
-          }
-          if (execCnt >= maxTries) {
-            log.atSevere().log(
-                "Failed to %s %s on %s after %d tries; giving up",
-                action, key, destination, maxTries);
-            return false;
-          }
+    @Override
+    public String toString() {
+      return String.format("%s:%s => %s (try #%d)", action, key, destination, execCnt);
+    }
 
-          log.atFine().log("Retrying to %s %s on %s", action, key, destination);
-          try {
-            Thread.sleep(cfg.http().retryInterval());
-          } catch (InterruptedException ie) {
-            log.atSevere().withCause(ie).log(
-                "%s %s towards %s was interrupted; giving up", action, key, destination);
-            Thread.currentThread().interrupt();
-            return false;
-          }
+    boolean execute() throws ForwardingException {
+      log.atFine().log("Executing %s %s towards %s", action, key, destination);
+      try {
+        execCnt++;
+        tryOnce();
+        log.atFine().log("%s %s towards %s OK", action, key, destination);
+        return true;
+      } catch (ForwardingException e) {
+        int maxTries = cfg.http().maxTries();
+        log.atFine().withCause(e).log(
+            "Failed to %s %s on %s [%d/%d]", action, key, destination, execCnt, maxTries);
+        if (!e.isRecoverable()) {
+          log.atSevere().withCause(e).log(
+              "%s %s towards %s failed with unrecoverable error; giving up",
+              action, key, destination);
+          throw e;
+        }
+        if (execCnt >= maxTries) {
+          log.atSevere().log(
+              "Failed to %s %s on %s after %d tries; giving up",
+              action, key, destination, maxTries);
+          throw e;
         }
       }
+      return false;
     }
 
     void tryOnce() throws ForwardingException {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
new file mode 100644
index 0000000..ebbd940
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderScheduler.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+@Singleton
+public class RestForwarderScheduler {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+  private final ScheduledExecutorService executor;
+  private final long retryIntervalMs;
+
+  public class CompletablePromise<V> extends CompletableFuture<V> {
+    private Future<V> future;
+
+    public CompletablePromise(Future<V> future) {
+      this.future = future;
+      executor.execute(this::tryToComplete);
+    }
+
+    private void tryToComplete() {
+      if (future.isDone()) {
+        try {
+          complete(future.get());
+        } catch (InterruptedException e) {
+          completeExceptionally(e);
+        } catch (ExecutionException e) {
+          completeExceptionally(e.getCause());
+        }
+        return;
+      }
+
+      if (future.isCancelled()) {
+        cancel(true);
+        return;
+      }
+
+      executor.execute(this::tryToComplete);
+    }
+  }
+
+  @Inject
+  public RestForwarderScheduler(
+      WorkQueue workQueue, Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider) {
+    int executorSize = peerInfoProvider.get().size() * cfg.index().threadPoolSize();
+    retryIntervalMs = cfg.index().retryInterval();
+    this.executor = workQueue.createQueue(executorSize, "RestForwarderScheduler");
+  }
+
+  @VisibleForTesting
+  public RestForwarderScheduler(ScheduledExecutorService executor) {
+    this.executor = executor;
+    retryIntervalMs = 0;
+  }
+
+  public CompletableFuture<Boolean> execute(RestForwarder.Request request) {
+    return execute(request, 0);
+  }
+
+  public CompletableFuture<Boolean> execute(RestForwarder.Request request, long delayMs) {
+    return supplyAsync(
+        request.toString(),
+        () -> {
+          try {
+            if (!request.execute()) {
+              log.atWarning().log(
+                  "Rescheduling %s for retry after %d msec", request, retryIntervalMs);
+              return execute(request, retryIntervalMs);
+            }
+            return CompletableFuture.completedFuture(true);
+          } catch (ForwardingException e) {
+            log.atSevere().withCause(e).log("Forwarding of %s has failed", request);
+            return CompletableFuture.completedFuture(false);
+          }
+        },
+        executor,
+        delayMs);
+  }
+
+  private CompletableFuture<Boolean> supplyAsync(
+      String taskName,
+      Supplier<CompletableFuture<Boolean>> fn,
+      ScheduledExecutorService executor,
+      long delayMs) {
+    BooleanAsyncSupplier asyncSupplier = new BooleanAsyncSupplier(taskName, fn);
+    executor.schedule(asyncSupplier, delayMs, TimeUnit.MILLISECONDS);
+    return asyncSupplier.future();
+  }
+
+  static class BooleanAsyncSupplier implements Runnable {
+    private CompletableFuture<CompletableFuture<Boolean>> dep;
+    private Supplier<CompletableFuture<Boolean>> fn;
+    private String taskName;
+
+    BooleanAsyncSupplier(String taskName, Supplier<CompletableFuture<Boolean>> fn) {
+      this.taskName = taskName;
+      this.dep = new CompletableFuture<>();
+      this.fn = fn;
+    }
+
+    public CompletableFuture<Boolean> future() {
+      return dep.thenCompose(Function.identity());
+    }
+
+    @Override
+    public void run() {
+      try {
+        dep.complete(fn.get());
+      } catch (Throwable ex) {
+        dep.completeExceptionally(ex);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return taskName;
+    }
+  }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index a352e38..b311db9 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -28,6 +28,7 @@
 import com.google.inject.Inject;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -155,7 +156,7 @@
           this,
           () -> {
             queuedTasks.remove(this);
-            execute();
+            return execute();
           },
           this::reschedule);
     }
@@ -169,7 +170,7 @@
       }
     }
 
-    abstract void execute();
+    abstract CompletableFuture<Boolean> execute();
   }
 
   class IndexChangeTask extends IndexTask {
@@ -183,8 +184,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexChange(projectName, changeId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexChange(projectName, changeId, indexEvent);
     }
 
     @Override
@@ -216,8 +217,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.deleteChangeFromIndex(changeId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.deleteChangeFromIndex(changeId, indexEvent);
     }
 
     @Override
@@ -248,8 +249,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexAccount(accountId, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexAccount(accountId, indexEvent);
     }
 
     @Override
@@ -280,8 +281,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexGroup(groupUUID, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexGroup(groupUUID, indexEvent);
     }
 
     @Override
@@ -312,8 +313,8 @@
     }
 
     @Override
-    public void execute() {
-      forwarder.indexProject(projectName, indexEvent);
+    public CompletableFuture<Boolean> execute() {
+      return forwarder.indexProject(projectName, indexEvent);
     }
 
     @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
index 4617e08..4434c34 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -20,6 +20,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
@@ -38,15 +39,16 @@
   }
 
   public void withLock(
-      IndexTask id, VoidFunction function, VoidFunction lockAcquireTimeoutCallback) {
+      IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
     Lock idLock = getLock(id);
     try {
       if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
-        try {
-          function.invoke();
-        } finally {
-          idLock.unlock();
-        }
+        function
+            .invoke()
+            .whenComplete(
+                (result, error) -> {
+                  idLock.unlock();
+                });
       } else {
         lockAcquireTimeoutCallback.invoke();
       }
@@ -64,4 +66,9 @@
   public interface VoidFunction {
     void invoke();
   }
+
+  @FunctionalInterface
+  public interface IndexCallFunction {
+    CompletableFuture<?> invoke();
+  }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 55a3c2a..44e6e73 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -37,6 +37,8 @@
 import com.google.inject.Provider;
 import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import org.junit.Before;
 import org.junit.Test;
@@ -83,17 +85,22 @@
   private static final String EVENT_ENDPOINT =
       Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "event", event.type);
 
+  private static final long TEST_TIMEOUT = 10;
+  private static final TimeUnit TEST_TIMEOUT_UNITS = TimeUnit.SECONDS;
+
   private RestForwarder forwarder;
   private HttpSession httpSessionMock;
+  private Configuration configMock;
+  Provider<Set<PeerInfo>> peersMock;
 
   @SuppressWarnings("unchecked")
   @Before
   public void setUp() {
     httpSessionMock = mock(HttpSession.class);
-    Configuration configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
+    configMock = mock(Configuration.class, Answers.RETURNS_DEEP_STUBS);
     when(configMock.http().maxTries()).thenReturn(3);
     when(configMock.http().retryInterval()).thenReturn(10);
-    Provider<Set<PeerInfo>> peersMock = mock(Provider.class);
+    peersMock = mock(Provider.class);
     when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL)));
     forwarder =
         new RestForwarder(
@@ -101,106 +108,146 @@
             PLUGIN_NAME,
             configMock,
             peersMock,
-            gsonProvider); // TODO: Create provider
+            gsonProvider, // TODO: Create provider
+            new RestForwarderScheduler(Executors.newScheduledThreadPool(1)));
   }
 
   @Test
   public void testIndexAccountOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexAccountFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexAccountThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_ACCOUNT_ENDPOINT), any());
-    assertThat(forwarder.indexAccount(ACCOUNT_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexAccount(ACCOUNT_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexGroupOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isTrue();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexGroupFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexGroupThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_GROUP_ENDPOINT), any());
-    assertThat(forwarder.indexGroup(UUID, new IndexEvent())).isFalse();
+    assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexChangeOK() throws Exception {
     when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testIndexChangeFailed() throws Exception {
     when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any()))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testIndexChangeThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_CHANGE_ENDPOINT), any());
-    assertThat(forwarder.indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testChangeDeletedFromIndexOK() throws Exception {
     when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isTrue();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
   public void testChangeDeletedFromIndexFailed() throws Exception {
     when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testChangeDeletedFromThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT));
-    assertThat(forwarder.deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())).isFalse();
+    assertThat(
+            forwarder
+                .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
   public void testEventSentOK() throws Exception {
     when(httpSessionMock.post(EVENT_ENDPOINT, event))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.send(event)).isTrue();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isTrue();
   }
 
   @Test
   public void testEventSentFailed() throws Exception {
     when(httpSessionMock.post(EVENT_ENDPOINT, event)).thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.send(event)).isFalse();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
   }
 
   @Test
   public void testEventSentThrowsException() throws Exception {
     doThrow(new IOException()).when(httpSessionMock).post(EVENT_ENDPOINT, event);
-    assertThat(forwarder.send(event)).isFalse();
+    assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse();
   }
 
   @Test
@@ -209,7 +256,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -218,7 +266,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.ACCOUNTS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.ACCOUNTS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.ACCOUNTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -227,7 +276,8 @@
     String keyJson = gson.toJson(key);
     String endpoint = buildCacheEndpoint(Constants.GROUPS);
     when(httpSessionMock.post(endpoint, keyJson)).thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.GROUPS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -236,7 +286,9 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_BYINCLUDE), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS_BYINCLUDE, key)).isTrue();
+    assertThat(
+            forwarder.evict(Constants.GROUPS_BYINCLUDE, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -245,7 +297,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_MEMBERS), keyJson))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key)).isTrue();
+    assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -254,7 +307,8 @@
     String keyJson = gson.toJson(key);
     when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -264,7 +318,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .post(buildCacheEndpoint(Constants.PROJECTS), keyJson);
-    assertThat(forwarder.evict(Constants.PROJECTS, key)).isFalse();
+    assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   private static String buildCacheEndpoint(String name) {
@@ -276,7 +331,8 @@
     String projectName = PROJECT_TO_ADD;
     when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.addToProjectList(projectName)).isTrue();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -284,7 +340,8 @@
     String projectName = PROJECT_TO_ADD;
     when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.addToProjectList(projectName)).isFalse();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -293,7 +350,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .post(buildProjectListCacheEndpoint(projectName), null);
-    assertThat(forwarder.addToProjectList(projectName)).isFalse();
+    assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -301,7 +359,8 @@
     String projectName = PROJECT_TO_DELETE;
     when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
         .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
-    assertThat(forwarder.removeFromProjectList(projectName)).isTrue();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
@@ -309,7 +368,8 @@
     String projectName = PROJECT_TO_DELETE;
     when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName)))
         .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
-    assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
@@ -318,7 +378,8 @@
     doThrow(new IOException())
         .when(httpSessionMock)
         .delete((buildProjectListCacheEndpoint(projectName)));
-    assertThat(forwarder.removeFromProjectList(projectName)).isFalse();
+    assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   private static String buildProjectListCacheEndpoint(String projectName) {
@@ -326,41 +387,57 @@
   }
 
   @Test
-  public void testRetryOnErrorThenSuccess() throws IOException {
+  public void testRetryOnErrorThenSuccess() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
-  public void testRetryOnIoExceptionThenSuccess() throws IOException {
+  public void testRetryOnIoExceptionThenSuccess() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenThrow(new IOException())
         .thenThrow(new IOException())
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isTrue();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isTrue();
   }
 
   @Test
-  public void testNoRetryAfterNonRecoverableException() throws IOException {
+  public void testNoRetryAfterNonRecoverableException() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenThrow(new SSLException("Non Recoverable"))
         .thenReturn(new HttpResult(true, SUCCESS));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 
   @Test
-  public void testFailureAfterMaxTries() throws IOException {
+  public void testFailureAfterMaxTries() throws Exception {
     when(httpSessionMock.post(anyString(), anyString()))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR))
         .thenReturn(new HttpResult(false, ERROR));
 
-    assertThat(forwarder.evict(Constants.PROJECT_LIST, new Object())).isFalse();
+    assertThat(
+            forwarder
+                .evict(Constants.PROJECT_LIST, new Object())
+                .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS))
+        .isFalse();
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 9a387a7..7097a3a 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -43,6 +43,7 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -105,6 +106,13 @@
     when(configuration.http()).thenReturn(http);
     when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
     when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
+    when(forwarder.indexAccount(eq(ACCOUNT_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.indexGroup(eq(UUID), any())).thenReturn(CompletableFuture.completedFuture(true));
+    when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any()))
+        .thenReturn(CompletableFuture.completedFuture(true));
 
     idLocks = new IndexEventLocks(configuration);
     setUpIndexEventHandler(currCtx);