Lock indexing events on forwarder site

Before this change, locking was happening on the receiver site. Long
running indexing tasks can cause socket timeout on forwarder site.
Forwarder uses retry mechanism to deliver the event again. Every retry
operation starts a new thread on receiver site. Because locking
happens on receiver each new event for the same id cause more retries
and more threads waiting to acquire the lock. This cause threads
starvation on the receiver site and brings down the Gerrit node.
Adding locking on the forwarder side allows to create a per id lock for
the indexing operations. Even if we have multiple events for a single
id only one is running, others are waiting in the queue or are dropped.

Bug: Issue 13561
Change-Id: I4c603c8f40e94f543e7dcb7b34159d4ca9df497d
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index 58a21b6..3bd58a3 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -45,6 +45,9 @@
 public class Configuration {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
 
+  public static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
+  public static final int DEFAULT_TIMEOUT_MS = 5000;
+
   // common parameter to peerInfo section
   static final String PEER_INFO_SECTION = "peerInfo";
 
@@ -54,7 +57,6 @@
   static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
   static final String NUM_STRIPED_LOCKS = "numStripedLocks";
-  static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
 
   private final Main main;
   private final AutoReindex autoReindex;
@@ -333,6 +335,9 @@
   }
 
   public static class Http {
+    public static final int DEFAULT_MAX_TRIES = 360;
+    public static final int DEFAULT_RETRY_INTERVAL = 10000;
+
     static final String HTTP_SECTION = "http";
     static final String USER_KEY = "user";
     static final String PASSWORD_KEY = "password";
@@ -341,10 +346,6 @@
     static final String MAX_TRIES_KEY = "maxTries";
     static final String RETRY_INTERVAL_KEY = "retryInterval";
 
-    static final int DEFAULT_TIMEOUT_MS = 5000;
-    static final int DEFAULT_MAX_TRIES = 360;
-    static final int DEFAULT_RETRY_INTERVAL = 10000;
-
     private final String user;
     private final String password;
     private final int connectionTimeout;
@@ -446,6 +447,7 @@
   public static class Index extends Forwarding {
     static final String INDEX_SECTION = "index";
     static final String MAX_TRIES_KEY = "maxTries";
+    static final String WAIT_TIMEOUT_KEY = "waitTimeout";
     static final String RETRY_INTERVAL_KEY = "retryInterval";
     static final String SYNCHRONIZE_FORCED_KEY = "synchronizeForced";
     static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
@@ -453,6 +455,7 @@
     private final int threadPoolSize;
     private final int retryInterval;
     private final int maxTries;
+    private final int waitTimeout;
     private final int numStripedLocks;
     private final boolean synchronizeForced;
 
@@ -462,6 +465,7 @@
       numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
       retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
       maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
+      waitTimeout = getInt(cfg, INDEX_SECTION, WAIT_TIMEOUT_KEY, DEFAULT_TIMEOUT_MS);
       synchronizeForced =
           cfg.getBoolean(INDEX_SECTION, SYNCHRONIZE_FORCED_KEY, DEFAULT_SYNCHRONIZE_FORCED);
     }
@@ -482,6 +486,10 @@
       return maxTries;
     }
 
+    public int waitTimeout() {
+      return waitTimeout;
+    }
+
     public boolean synchronizeForced() {
       return synchronizeForced;
     }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
index 6635b80..8a267d6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Setup.java
@@ -24,6 +24,7 @@
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.CACHE_SECTION;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.SYNCHRONIZE_KEY;
@@ -33,7 +34,6 @@
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.CONNECTION_TIMEOUT_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_MAX_TRIES;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_RETRY_INTERVAL;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_TIMEOUT_MS;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.HTTP_SECTION;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.MAX_TRIES_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.PASSWORD_KEY;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
index bb21f13..f31331d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
@@ -14,7 +14,6 @@
 
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.server.index.account.AccountIndexer;
 import com.google.inject.Inject;
@@ -32,8 +31,7 @@
   private final AccountIndexer indexer;
 
   @Inject
-  ForwardedIndexAccountHandler(AccountIndexer indexer, Configuration config) {
-    super(config.index());
+  ForwardedIndexAccountHandler(AccountIndexer indexer) {
     this.indexer = indexer;
   }
 
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
index 25fc93f..add93d7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -55,7 +55,6 @@
       @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
       OneOffRequestContext oneOffCtx,
       ChangeCheckerImpl.Factory changeCheckerFactory) {
-    super(config.index());
     this.indexer = indexer;
     this.indexExecutor = indexExecutor;
     this.oneOffCtx = oneOffCtx;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
index 9272a60..127af36 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
@@ -14,7 +14,6 @@
 
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
 import com.google.inject.Inject;
@@ -32,8 +31,7 @@
   private final GroupIndexer indexer;
 
   @Inject
-  ForwardedIndexGroupHandler(GroupIndexer indexer, Configuration config) {
-    super(config.index());
+  ForwardedIndexGroupHandler(GroupIndexer indexer) {
     this.indexer = indexer;
   }
 
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
index 076a44c..b6daf41 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandler.java
@@ -14,7 +14,6 @@
 
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.google.gerrit.index.project.ProjectIndexer;
 import com.google.gerrit.reviewdb.client.Project;
 import com.google.inject.Inject;
@@ -32,8 +31,7 @@
   private final ProjectIndexer indexer;
 
   @Inject
-  ForwardedIndexProjectHandler(ProjectIndexer indexer, Configuration config) {
-    super(config.index());
+  ForwardedIndexProjectHandler(ProjectIndexer indexer) {
     this.indexer = indexer;
   }
 
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
index 48dfba3..9977ff1 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexingHandler.java
@@ -14,12 +14,9 @@
 
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.google.common.flogger.FluentLogger;
-import com.google.common.util.concurrent.Striped;
 import java.io.IOException;
 import java.util.Optional;
-import java.util.concurrent.locks.Lock;
 
 /**
  * Base class to handle forwarded indexing. This class is meant to be extended by classes used on
@@ -40,16 +37,10 @@
     }
   }
 
-  private final Striped<Lock> idLocks;
-
   protected abstract void doIndex(T id, Optional<IndexEvent> indexEvent) throws IOException;
 
   protected abstract void doDelete(T id, Optional<IndexEvent> indexEvent) throws IOException;
 
-  protected ForwardedIndexingHandler(Configuration.Index indexConfig) {
-    idLocks = Striped.lock(indexConfig.numStripedLocks());
-  }
-
   /**
    * Index an item in the local node, indexing will not be forwarded to the other node.
    *
@@ -62,22 +53,16 @@
     log.atFine().log("%s %s %s", operation, id, indexEvent);
     try {
       Context.setForwardedEvent(true);
-      Lock idLock = idLocks.get(id);
-      idLock.lock();
-      try {
-        switch (operation) {
-          case INDEX:
-            doIndex(id, indexEvent);
-            break;
-          case DELETE:
-            doDelete(id, indexEvent);
-            break;
-          default:
-            log.atSevere().log("unexpected operation: %s", operation);
-            break;
-        }
-      } finally {
-        idLock.unlock();
+      switch (operation) {
+        case INDEX:
+          doIndex(id, indexEvent);
+          break;
+        case DELETE:
+          doDelete(id, indexEvent);
+          break;
+        default:
+          log.atSevere().log("unexpected operation: %s", operation);
+          break;
       }
     } finally {
       Context.unsetForwardedEvent();
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 0697ad7..a352e38 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -14,6 +14,7 @@
 
 package com.ericsson.gerrit.plugins.highavailability.index;
 
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
@@ -28,7 +29,8 @@
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 class IndexEventHandler
     implements ChangeIndexedListener,
@@ -36,25 +38,34 @@
         GroupIndexedListener,
         ProjectIndexedListener {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
-  private final Executor executor;
+  private final ScheduledExecutorService executor;
   private final Forwarder forwarder;
   private final String pluginName;
   private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ChangeCheckerImpl.Factory changeChecker;
   private final CurrentRequestContext currCtx;
+  private final IndexEventLocks locks;
+
+  private final int retryInterval;
+  private final int maxTries;
 
   @Inject
   IndexEventHandler(
-      @IndexExecutor Executor executor,
+      @IndexExecutor ScheduledExecutorService executor,
       @PluginName String pluginName,
       Forwarder forwarder,
       ChangeCheckerImpl.Factory changeChecker,
-      CurrentRequestContext currCtx) {
+      CurrentRequestContext currCtx,
+      Configuration cfg,
+      IndexEventLocks locks) {
     this.forwarder = forwarder;
     this.executor = executor;
     this.pluginName = pluginName;
     this.changeChecker = changeChecker;
     this.currCtx = currCtx;
+    this.locks = locks;
+    this.retryInterval = cfg.http().retryInterval();
+    this.maxTries = cfg.http().maxTries();
   }
 
   @Override
@@ -128,6 +139,7 @@
 
   abstract class IndexTask implements Runnable {
     protected final IndexEvent indexEvent;
+    private int retryCount = 0;
 
     IndexTask() {
       indexEvent = new IndexEvent();
@@ -139,8 +151,22 @@
 
     @Override
     public void run() {
-      queuedTasks.remove(this);
-      execute();
+      locks.withLock(
+          this,
+          () -> {
+            queuedTasks.remove(this);
+            execute();
+          },
+          this::reschedule);
+    }
+
+    private void reschedule() {
+      if (++retryCount <= maxTries) {
+        log.atFine().log("Retrying %d times to %s", retryCount, this);
+        executor.schedule(this, retryInterval, TimeUnit.MILLISECONDS);
+      } else {
+        log.atSevere().log("Failed to %s after %d tries; giving up", this, maxTries);
+      }
     }
 
     abstract void execute();
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
new file mode 100644
index 0000000..4617e08
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -0,0 +1,67 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexTask;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.flogger.FluentLogger;
+import com.google.common.util.concurrent.Striped;
+import com.google.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+public class IndexEventLocks {
+  private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+  private static final int NUMBER_OF_INDEX_TASK_TYPES = 4;
+
+  private final Striped<Lock> locks;
+  private final long waitTimeout;
+
+  @Inject
+  public IndexEventLocks(Configuration cfg) {
+    this.locks = Striped.lock(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks());
+    this.waitTimeout = cfg.index().waitTimeout();
+  }
+
+  public void withLock(
+      IndexTask id, VoidFunction function, VoidFunction lockAcquireTimeoutCallback) {
+    Lock idLock = getLock(id);
+    try {
+      if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
+        try {
+          function.invoke();
+        } finally {
+          idLock.unlock();
+        }
+      } else {
+        lockAcquireTimeoutCallback.invoke();
+      }
+    } catch (InterruptedException e) {
+      log.atSevere().withCause(e).log("%s was interrupted; giving up", id);
+    }
+  }
+
+  @VisibleForTesting
+  protected Lock getLock(IndexTask id) {
+    return locks.get(id);
+  }
+
+  @FunctionalInterface
+  public interface VoidFunction {
+    void invoke();
+  }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index c17731f..9d0d03d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -20,18 +20,21 @@
 import com.google.gerrit.extensions.events.ProjectIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 public class IndexModule extends LifecycleModule {
 
   @Override
   protected void configure() {
-    bind(Executor.class).annotatedWith(IndexExecutor.class).toProvider(IndexExecutorProvider.class);
+    bind(ScheduledExecutorService.class)
+        .annotatedWith(IndexExecutor.class)
+        .toProvider(IndexExecutorProvider.class);
     bind(ScheduledExecutorService.class)
         .annotatedWith(ForwardedIndexExecutor.class)
         .toProvider(ForwardedIndexExecutorProvider.class);
+    bind(IndexEventLocks.class).in(Scopes.SINGLETON);
     listener().to(IndexExecutorProvider.class);
     DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 611b0e7..0ead279 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -160,8 +160,12 @@
 :   The interval of time in milliseconds between the subsequent auto-retries.
     When not specified, the default value is set to 10000ms.
 
-NOTE: the default settings for `http.timeout` and `http.maxTries` ensure that
-the plugin will keep retrying to forward a message for one hour.
+```index.waitTimeout```
+:   Maximum interval of time in milliseconds the plugin waits to acquire the lock for
+    an indexing call. When not specified, the default value is set to 5000ms.
+
+NOTE: the default settings for `http.timeout`, `http.maxTries` and `http.lockAcquireTimeout`
+ensure that the plugin will keep retrying to forward a message for one hour.
 
 ```cache.synchronize```
 :   Whether to synchronize cache evictions.
@@ -210,6 +214,11 @@
 :   The interval of time in milliseconds between the subsequent auto-retries.
     Defaults to 30000 (30 seconds).
 
+```index.waitTimeout```
+:   Maximum interval of time in milliseconds the plugin waits to acquire
+    the lock for an indexing call. When not specified, the default value
+    is set to 5000ms.
+
 ```websession.synchronize```
 :   Whether to synchronize web sessions.
     Defaults to true.
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index e28f8c9..a9537bd 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -18,6 +18,7 @@
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_NUM_STRIPED_LOCKS;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_THREAD_POOL_SIZE;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_TIMEOUT_MS;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Event.EVENT_SECTION;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.DEFAULT_SYNCHRONIZE;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Forwarding.SYNCHRONIZE_KEY;
@@ -27,7 +28,6 @@
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.CONNECTION_TIMEOUT_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_MAX_TRIES;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_RETRY_INTERVAL;
-import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.DEFAULT_TIMEOUT_MS;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.HTTP_SECTION;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.MAX_TRIES_KEY;
 import static com.ericsson.gerrit.plugins.highavailability.Configuration.Http.PASSWORD_KEY;
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
index b88e9c0..14f3d54 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandlerTest.java
@@ -19,9 +19,7 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.server.index.account.AccountIndexer;
@@ -38,16 +36,12 @@
 public class ForwardedIndexAccountHandlerTest {
 
   @Mock private AccountIndexer indexerMock;
-  @Mock private Configuration configMock;
-  @Mock private Configuration.Index indexMock;
   private ForwardedIndexAccountHandler handler;
   private Account.Id id;
 
   @Before
   public void setUp() throws Exception {
-    when(configMock.index()).thenReturn(indexMock);
-    when(indexMock.numStripedLocks()).thenReturn(10);
-    handler = new ForwardedIndexAccountHandler(indexerMock, configMock);
+    handler = new ForwardedIndexAccountHandler(indexerMock);
     id = new Account.Id(123);
   }
 
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
index df36271..a6da8e2 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -70,7 +70,6 @@
     Change change = new Change(null, id, null, null, TimeUtil.nowTs());
     when(changeNotes.getChange()).thenReturn(change);
     when(configMock.index()).thenReturn(indexMock);
-    when(indexMock.numStripedLocks()).thenReturn(10);
     when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
     handler =
         new ForwardedIndexChangeHandler(
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
index 845abb0..7e30dea 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandlerTest.java
@@ -19,9 +19,7 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.server.index.group.GroupIndexer;
@@ -38,16 +36,12 @@
 public class ForwardedIndexGroupHandlerTest {
 
   @Mock private GroupIndexer indexerMock;
-  @Mock private Configuration configMock;
-  @Mock private Configuration.Index indexMock;
   private ForwardedIndexGroupHandler handler;
   private AccountGroup.UUID uuid;
 
   @Before
   public void setUp() throws Exception {
-    when(configMock.index()).thenReturn(indexMock);
-    when(indexMock.numStripedLocks()).thenReturn(10);
-    handler = new ForwardedIndexGroupHandler(indexerMock, configMock);
+    handler = new ForwardedIndexGroupHandler(indexerMock);
     uuid = new AccountGroup.UUID("123");
   }
 
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
index a0c6979..ea36532 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexProjectHandlerTest.java
@@ -19,9 +19,7 @@
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
 import com.google.gerrit.index.project.ProjectIndexer;
 import com.google.gerrit.reviewdb.client.Project;
@@ -38,16 +36,12 @@
 public class ForwardedIndexProjectHandlerTest {
 
   @Mock private ProjectIndexer indexerMock;
-  @Mock private Configuration configMock;
-  @Mock private Configuration.Index indexMock;
   private ForwardedIndexProjectHandler handler;
   private Project.NameKey nameKey;
 
   @Before
   public void setUp() {
-    when(configMock.index()).thenReturn(indexMock);
-    when(indexMock.numStripedLocks()).thenReturn(10);
-    handler = new ForwardedIndexProjectHandler(indexerMock, configMock);
+    handler = new ForwardedIndexProjectHandler(indexerMock);
     nameKey = new Project.NameKey("project/name");
   }
 
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 2d12ca8..9a387a7 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -32,20 +32,31 @@
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexProjectTask;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gerrit.server.util.RequestContext;
 import com.google.gerrit.server.util.ThreadLocalRequestContext;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -64,7 +75,10 @@
   private Change.Id changeId;
   private Account.Id accountId;
   private AccountGroup.UUID accountGroupUUID;
+  private ScheduledExecutorService executor = new CurrentThreadScheduledExecutorService();
   @Mock private RequestContext mockCtx;
+  @Mock private Configuration configuration;
+  private IndexEventLocks idLocks;
 
   private CurrentRequestContext currCtx =
       new CurrentRequestContext(null, null, null) {
@@ -82,17 +96,41 @@
     when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
     when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
 
+    Configuration.Index cfgIndex = mock(Configuration.Index.class);
+    when(configuration.index()).thenReturn(cfgIndex);
+    when(cfgIndex.numStripedLocks()).thenReturn(Configuration.DEFAULT_NUM_STRIPED_LOCKS);
+    when(cfgIndex.waitTimeout()).thenReturn(Configuration.DEFAULT_TIMEOUT_MS);
+
+    Configuration.Http http = mock(Configuration.Http.class);
+    when(configuration.http()).thenReturn(http);
+    when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
+    when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
+
+    idLocks = new IndexEventLocks(configuration);
     setUpIndexEventHandler(currCtx);
   }
 
   public void setUpIndexEventHandler(CurrentRequestContext currCtx) throws Exception {
+    setUpIndexEventHandler(currCtx, idLocks, configuration);
+  }
+
+  public void setUpIndexEventHandler(CurrentRequestContext currCtx, IndexEventLocks idLocks)
+      throws Exception {
+    setUpIndexEventHandler(currCtx, idLocks, configuration);
+  }
+
+  public void setUpIndexEventHandler(
+      CurrentRequestContext currCtx, IndexEventLocks idLocks, Configuration configuration)
+      throws Exception {
     indexEventHandler =
         new IndexEventHandler(
-            MoreExecutors.directExecutor(),
+            executor,
             PLUGIN_NAME,
             forwarder,
             changeCheckerFactoryMock,
-            currCtx);
+            currCtx,
+            configuration,
+            idLocks);
   }
 
   @Test
@@ -115,6 +153,153 @@
   }
 
   @Test
+  public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+    verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+  }
+
+  @Test
+  public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexAccountTask.class))).thenReturn(lock);
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onAccountIndexed(accountId.get());
+
+    verify(forwarder, never()).indexAccount(eq(ACCOUNT_ID), any());
+  }
+
+  @Test
+  public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(DeleteChangeTask.class))).thenReturn(lock);
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onChangeDeleted(changeId.get());
+
+    verify(forwarder, never()).deleteChangeFromIndex(eq(CHANGE_ID), any());
+  }
+
+  @Test
+  public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexGroupTask.class))).thenReturn(lock);
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onGroupIndexed(accountGroupUUID.get());
+
+    verify(forwarder, never()).indexGroup(eq(UUID), any());
+  }
+
+  @Test
+  public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexProjectTask.class))).thenReturn(lock);
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onProjectIndexed(PROJECT_NAME);
+
+    verify(forwarder, never()).indexProject(eq(PROJECT_NAME), any());
+  }
+
+  @Test
+  public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false, true);
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+    verify(locks, times(2)).withLock(any(), any(), any());
+    verify(forwarder, times(1)).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+  }
+
+  @Test
+  public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+
+    Configuration cfg = mock(Configuration.class);
+    Configuration.Http httpCfg = mock(Configuration.Http.class);
+    when(httpCfg.maxTries()).thenReturn(10);
+    when(cfg.http()).thenReturn(httpCfg);
+    setUpIndexEventHandler(currCtx, locks, cfg);
+
+    indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+    verify(locks, times(11)).withLock(any(), any(), any());
+    verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+  }
+
+  @Test
+  public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock lock = mock(Lock.class);
+    when(locks.getLock(any(IndexChangeTask.class))).thenReturn(lock);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+
+    Configuration cfg = mock(Configuration.class);
+    Configuration.Http httpCfg = mock(Configuration.Http.class);
+    when(httpCfg.maxTries()).thenReturn(0);
+    when(cfg.http()).thenReturn(httpCfg);
+    setUpIndexEventHandler(currCtx, locks, cfg);
+
+    indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+
+    verify(locks, times(1)).withLock(any(), any(), any());
+    verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+  }
+
+  @Test
+  public void shouldLockPerIndexEventType() throws Exception {
+    IndexEventLocks locks = mock(IndexEventLocks.class);
+    Lock indexChangeLock = mock(Lock.class);
+    when(indexChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
+    Lock accountChangeLock = mock(Lock.class);
+    when(accountChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(true);
+    when(locks.getLock(any(IndexChangeTask.class))).thenReturn(indexChangeLock);
+    when(locks.getLock(any(IndexAccountTask.class))).thenReturn(accountChangeLock);
+    Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
+    setUpIndexEventHandler(currCtx, locks);
+
+    indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
+    indexEventHandler.onAccountIndexed(accountId.get());
+
+    verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
+    verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
+  }
+
+  @Test
   public void shouldReindexInRemoteWhenContextIsMissingButForcedIndexingEnabled() throws Exception {
     ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
     OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
@@ -179,7 +364,14 @@
   public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
     indexEventHandler =
-        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+        new IndexEventHandler(
+            poolMock,
+            PLUGIN_NAME,
+            forwarder,
+            changeCheckerFactoryMock,
+            currCtx,
+            configuration,
+            idLocks);
     indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
     indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
     verify(poolMock, times(1))
@@ -190,7 +382,14 @@
   public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
     indexEventHandler =
-        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+        new IndexEventHandler(
+            poolMock,
+            PLUGIN_NAME,
+            forwarder,
+            changeCheckerFactoryMock,
+            currCtx,
+            configuration,
+            idLocks);
     indexEventHandler.onAccountIndexed(accountId.get());
     indexEventHandler.onAccountIndexed(accountId.get());
     verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
@@ -200,7 +399,14 @@
   public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
     indexEventHandler =
-        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock, currCtx);
+        new IndexEventHandler(
+            poolMock,
+            PLUGIN_NAME,
+            forwarder,
+            changeCheckerFactoryMock,
+            currCtx,
+            configuration,
+            idLocks);
     indexEventHandler.onGroupIndexed(accountGroupUUID.get());
     indexEventHandler.onGroupIndexed(accountGroupUUID.get());
     verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
@@ -316,4 +522,98 @@
     assertThat(task.equals(differentGroupIdTask)).isFalse();
     assertThat(task.hashCode()).isNotEqualTo(differentGroupIdTask.hashCode());
   }
+
+  private class CurrentThreadScheduledExecutorService implements ScheduledExecutorService {
+
+    @Override
+    public void shutdown() {}
+
+    @Override
+    public List<Runnable> shutdownNow() {
+      return null;
+    }
+
+    @Override
+    public boolean isShutdown() {
+      return false;
+    }
+
+    @Override
+    public boolean isTerminated() {
+      return false;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+      return false;
+    }
+
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+      return null;
+    }
+
+    @Override
+    public <T> Future<T> submit(Runnable task, T result) {
+      return null;
+    }
+
+    @Override
+    public Future<?> submit(Runnable task) {
+      return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException {
+      return null;
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException {
+      return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+        throws InterruptedException, ExecutionException {
+      return null;
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+      return null;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+      command.run();
+    }
+
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+      command.run();
+      return null;
+    }
+
+    @Override
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+      return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(
+        Runnable command, long initialDelay, long period, TimeUnit unit) {
+      return null;
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(
+        Runnable command, long initialDelay, long delay, TimeUnit unit) {
+      return null;
+    }
+  }
 }