Fix indexing locks on the sender side

Replace locks with semaphores when blocking multiple
operations on the sender side for the same index name + id.

Using native locks would require to have exactly
the same thread holding the lock, which may or may not
be the case for forwarding a remote indexing operation.

When running a remote indexing, the operation may be
retried multiple times in different threads, hence the use
of a Semaphore instead of a Lock is more appropriate.
When remote indexing is not retried, the execution is
delegated to a REST forwarder executor that makes the remote
invocation and returns the result in a different thread,
failing to release the lock.

The failure to release the lock is not detected because
the execution is swallowed by the lambda: the situation isn't
immediately critical, because *IF* there is no huge load
of changes coming through, the next invocation may land on the
same thread. However, if that doesn't happen because of
parallel load using more threads, the JVM ends up in deadlock.

Semaphores do not require to be acquired and released by
the same threads and would functionally perform the same
type of exclusive "logical locking" for preventing the
concurrent reindexing of the same entity.

Bug: Issue 13824
Change-Id: I39484b0d0cdf21d60bf334d1d1fa90f07ea28bba
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
index 332a7d6..7dad07d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -21,34 +21,44 @@
 import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 
 public class IndexEventLocks {
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
 
   private static final int NUMBER_OF_INDEX_TASK_TYPES = 4;
 
-  private final Striped<Lock> locks;
+  private final Striped<Semaphore> semaphores;
   private final long waitTimeout;
 
   @Inject
   public IndexEventLocks(Configuration cfg) {
-    this.locks = Striped.lock(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks());
+    this.semaphores =
+        Striped.semaphore(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks(), 1);
     this.waitTimeout = cfg.index().waitTimeout();
   }
 
   public CompletableFuture<?> withLock(
       IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
     String indexId = id.indexId();
-    Lock idLock = getLock(indexId);
+    Semaphore idSemaphore = getSemaphore(indexId);
     try {
-      if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
+      log.atFine().log("Trying to acquire %s", id);
+      if (idSemaphore.tryAcquire(waitTimeout, TimeUnit.MILLISECONDS)) {
+        log.atFine().log("Acquired %s", id);
         return function
             .invoke()
             .whenComplete(
                 (result, error) -> {
-                  idLock.unlock();
+                  try {
+                    log.atFine().log("Trying to release %s", id);
+                    idSemaphore.release();
+                    log.atFine().log("Released %s", id);
+                  } catch (Throwable t) {
+                    log.atSevere().withCause(t).log("Unable to release %s", id);
+                    throw t;
+                  }
                 });
       }
 
@@ -70,8 +80,8 @@
   }
 
   @VisibleForTesting
-  protected Lock getLock(String indexId) {
-    return locks.get(indexId);
+  protected Semaphore getSemaphore(String indexId) {
+    return semaphores.get(indexId);
   }
 
   @FunctionalInterface
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index ffad41f..17013cc 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -53,10 +53,10 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.junit.Before;
@@ -173,10 +173,11 @@
   @Test
   public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
     setUpIndexEventHandler(currCtx, locks);
 
     indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
@@ -187,9 +188,10 @@
   @Test
   public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
     setUpIndexEventHandler(currCtx, locks);
 
@@ -201,9 +203,10 @@
   @Test
   public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
     setUpIndexEventHandler(currCtx, locks);
 
@@ -215,9 +218,10 @@
   @Test
   public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
     setUpIndexEventHandler(currCtx, locks);
 
@@ -229,9 +233,10 @@
   @Test
   public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
     setUpIndexEventHandler(currCtx, locks);
 
@@ -243,10 +248,10 @@
   @Test
   public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
         .thenReturn(false, true);
     setUpIndexEventHandler(currCtx, locks);
 
@@ -259,10 +264,11 @@
   @Test
   public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
 
     Configuration cfg = mock(Configuration.class);
     Configuration.Http httpCfg = mock(Configuration.Http.class);
@@ -279,10 +285,11 @@
   @Test
   public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock lock = mock(Lock.class);
-    when(locks.getLock(anyString())).thenReturn(lock);
+    Semaphore semaphore = mock(Semaphore.class);
+    when(locks.getSemaphore(anyString())).thenReturn(semaphore);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
-    when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+    when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+        .thenReturn(false);
 
     Configuration cfg = mock(Configuration.class);
     Configuration.Http httpCfg = mock(Configuration.Http.class);
@@ -299,14 +306,14 @@
   @Test
   public void shouldLockPerIndexEventType() throws Exception {
     IndexEventLocks locks = mock(IndexEventLocks.class);
-    Lock indexChangeLock = mock(Lock.class);
-    when(indexChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+    Semaphore indexChangeLock = mock(Semaphore.class);
+    when(indexChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
         .thenReturn(false);
-    Lock accountChangeLock = mock(Lock.class);
-    when(accountChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+    Semaphore accountChangeLock = mock(Semaphore.class);
+    when(accountChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
         .thenReturn(true);
-    when(locks.getLock(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
-    when(locks.getLock(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
+    when(locks.getSemaphore(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
+    when(locks.getSemaphore(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
     Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
     setUpIndexEventHandler(currCtx, locks);