Merge "Fix indexing locks on the sender side" into stable-3.0
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
index 332a7d6..7dad07d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventLocks.java
@@ -21,34 +21,44 @@
import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
public class IndexEventLocks {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private static final int NUMBER_OF_INDEX_TASK_TYPES = 4;
- private final Striped<Lock> locks;
+ private final Striped<Semaphore> semaphores;
private final long waitTimeout;
@Inject
public IndexEventLocks(Configuration cfg) {
- this.locks = Striped.lock(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks());
+ this.semaphores =
+ Striped.semaphore(NUMBER_OF_INDEX_TASK_TYPES * cfg.index().numStripedLocks(), 1);
this.waitTimeout = cfg.index().waitTimeout();
}
public CompletableFuture<?> withLock(
IndexTask id, IndexCallFunction function, VoidFunction lockAcquireTimeoutCallback) {
String indexId = id.indexId();
- Lock idLock = getLock(indexId);
+ Semaphore idSemaphore = getSemaphore(indexId);
try {
- if (idLock.tryLock(waitTimeout, TimeUnit.MILLISECONDS)) {
+ log.atFine().log("Trying to acquire %s", id);
+ if (idSemaphore.tryAcquire(waitTimeout, TimeUnit.MILLISECONDS)) {
+ log.atFine().log("Acquired %s", id);
return function
.invoke()
.whenComplete(
(result, error) -> {
- idLock.unlock();
+ try {
+ log.atFine().log("Trying to release %s", id);
+ idSemaphore.release();
+ log.atFine().log("Released %s", id);
+ } catch (Throwable t) {
+ log.atSevere().withCause(t).log("Unable to release %s", id);
+ throw t;
+ }
});
}
@@ -70,8 +80,8 @@
}
@VisibleForTesting
- protected Lock getLock(String indexId) {
- return locks.get(indexId);
+ protected Semaphore getSemaphore(String indexId) {
+ return semaphores.get(indexId);
}
@FunctionalInterface
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index ffad41f..17013cc 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -53,10 +53,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Before;
@@ -173,10 +173,11 @@
@Test
public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
@@ -187,9 +188,10 @@
@Test
public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
@@ -201,9 +203,10 @@
@Test
public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
@@ -215,9 +218,10 @@
@Test
public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
@@ -229,9 +233,10 @@
@Test
public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
@@ -243,10 +248,10 @@
@Test
public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false, true);
setUpIndexEventHandler(currCtx, locks);
@@ -259,10 +264,11 @@
@Test
public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Configuration cfg = mock(Configuration.class);
Configuration.Http httpCfg = mock(Configuration.Http.class);
@@ -279,10 +285,11 @@
@Test
public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock lock = mock(Lock.class);
- when(locks.getLock(anyString())).thenReturn(lock);
+ Semaphore semaphore = mock(Semaphore.class);
+ when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
- when(lock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS))).thenReturn(false);
+ when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ .thenReturn(false);
Configuration cfg = mock(Configuration.class);
Configuration.Http httpCfg = mock(Configuration.Http.class);
@@ -299,14 +306,14 @@
@Test
public void shouldLockPerIndexEventType() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
- Lock indexChangeLock = mock(Lock.class);
- when(indexChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ Semaphore indexChangeLock = mock(Semaphore.class);
+ when(indexChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
- Lock accountChangeLock = mock(Lock.class);
- when(accountChangeLock.tryLock(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
+ Semaphore accountChangeLock = mock(Semaphore.class);
+ when(accountChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(true);
- when(locks.getLock(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
- when(locks.getLock(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
+ when(locks.getSemaphore(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
+ when(locks.getSemaphore(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);