blob: 578dc179570aa83e0da1f07891001d88e96926be [file] [log] [blame]
// Copyright (C) 2015 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.ericsson.gerrit.plugins.highavailability.index;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.DeleteChangeTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexTask;
import com.ericsson.gerrit.plugins.highavailability.index.IndexEventLocks.VoidFunction;
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.AccountGroup;
import com.google.gerrit.entities.Change;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class IndexEventHandlerTest {
private static final String PLUGIN_NAME = "high-availability";
private static final String PROJECT_NAME = "test/project";
private static final int CHANGE_ID = 1;
private static final int ACCOUNT_ID = 2;
private static final String UUID = "3";
private static final String OTHER_UUID = "4";
private static final Integer INDEX_WAIT_TIMEOUT_MS = 5;
private static final int MAX_TEST_PARALLELISM = 4;
private IndexEventHandler indexEventHandler;
@Mock private Forwarder forwarder;
@Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
@Mock private ChangeChecker changeCheckerMock;
private Change.Id changeId;
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
private ScheduledExecutorService executor = new CurrentThreadScheduledExecutorService();
private ScheduledExecutorService batchExecutor = new CurrentThreadScheduledExecutorService();
private ScheduledExecutorService testExecutor =
Executors.newScheduledThreadPool(MAX_TEST_PARALLELISM);
@Mock private RequestContext mockCtx;
@Mock private Configuration configuration;
private IndexEventLocks idLocks;
private CurrentRequestContext currCtx =
new CurrentRequestContext(null, null, null) {
@Override
public void onlyWithContext(Consumer<RequestContext> body) {
body.accept(mockCtx);
}
};
@Before
public void setUpMocks() throws Exception {
changeId = Change.id(CHANGE_ID);
accountId = Account.id(ACCOUNT_ID);
accountGroupUUID = AccountGroup.uuid(UUID);
when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
Configuration.Index cfgIndex = mock(Configuration.Index.class);
when(configuration.index()).thenReturn(cfgIndex);
when(cfgIndex.numStripedLocks()).thenReturn(Configuration.DEFAULT_NUM_STRIPED_LOCKS);
Configuration.Http http = mock(Configuration.Http.class);
when(configuration.http()).thenReturn(http);
when(http.maxTries()).thenReturn(Configuration.Http.DEFAULT_MAX_TRIES);
when(http.retryInterval()).thenReturn(Configuration.Http.DEFAULT_RETRY_INTERVAL);
when(forwarder.indexAccount(eq(ACCOUNT_ID), any()))
.thenReturn(CompletableFuture.completedFuture(true));
when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any()))
.thenReturn(CompletableFuture.completedFuture(true));
when(forwarder.indexGroup(eq(UUID), any())).thenReturn(CompletableFuture.completedFuture(true));
when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any()))
.thenReturn(CompletableFuture.completedFuture(true));
idLocks = new IndexEventLocks(configuration);
setUpIndexEventHandler(currCtx);
}
public void setUpIndexEventHandler(CurrentRequestContext currCtx) throws Exception {
setUpIndexEventHandler(currCtx, idLocks, configuration);
}
public void setUpIndexEventHandler(CurrentRequestContext currCtx, IndexEventLocks idLocks)
throws Exception {
setUpIndexEventHandler(currCtx, idLocks, configuration);
}
public void setUpIndexEventHandler(
CurrentRequestContext currCtx, IndexEventLocks idLocks, Configuration configuration)
throws Exception {
indexEventHandler =
new IndexEventHandler(
executor,
batchExecutor,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
currCtx,
configuration,
idLocks);
}
@Test
public void shouldIndexInRemoteOnChangeIndexedEvent() throws Exception {
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(forwarder).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldNotIndexInRemoteWhenContextIsMissing() throws Exception {
ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
Configuration cfgMock = mock(Configuration.class);
Configuration.Index cfgIndex = mock(Configuration.Index.class);
when(cfgMock.index()).thenReturn(cfgIndex);
setUpIndexEventHandler(new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldNotIndexChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldNotIndexAccountWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onAccountIndexed(accountId.get());
verify(forwarder, never()).indexAccount(eq(ACCOUNT_ID), any());
}
@Test
public void shouldNotDeleteChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onChangeDeleted(changeId.get());
verify(forwarder, never()).deleteChangeFromIndex(eq(CHANGE_ID), any());
}
@Test
public void shouldNotIndexGroupWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(forwarder, never()).indexGroup(eq(UUID), any());
}
@Test
public void shouldNotIndexProjectWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onProjectIndexed(PROJECT_NAME);
verify(forwarder, never()).indexProject(eq(PROJECT_NAME), any());
}
@Test
public void shouldRetryIndexChangeWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false, true);
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(locks, times(2)).withLock(any(), any(), any());
verify(forwarder, times(1)).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldRetryUpToMaxTriesWhenCannotAcquireLock() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Configuration cfg = mock(Configuration.class);
Configuration.Http httpCfg = mock(Configuration.Http.class);
when(httpCfg.maxTries()).thenReturn(10);
when(cfg.http()).thenReturn(httpCfg);
setUpIndexEventHandler(currCtx, locks, cfg);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(locks, times(11)).withLock(any(), any(), any());
verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldNotRetryWhenMaxTriesLowerThanOne() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore semaphore = mock(Semaphore.class);
when(locks.getSemaphore(anyString())).thenReturn(semaphore);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
when(semaphore.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Configuration cfg = mock(Configuration.class);
Configuration.Http httpCfg = mock(Configuration.Http.class);
when(httpCfg.maxTries()).thenReturn(0);
when(cfg.http()).thenReturn(httpCfg);
setUpIndexEventHandler(currCtx, locks, cfg);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(locks, times(1)).withLock(any(), any(), any());
verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldLockPerIndexEventType() throws Exception {
IndexEventLocks locks = mock(IndexEventLocks.class);
Semaphore indexChangeLock = mock(Semaphore.class);
when(indexChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(false);
Semaphore accountChangeLock = mock(Semaphore.class);
when(accountChangeLock.tryAcquire(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS)))
.thenReturn(true);
when(locks.getSemaphore(eq("change/" + CHANGE_ID))).thenReturn(indexChangeLock);
when(locks.getSemaphore(eq("account/" + ACCOUNT_ID))).thenReturn(accountChangeLock);
Mockito.doCallRealMethod().when(locks).withLock(any(), any(), any());
setUpIndexEventHandler(currCtx, locks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onAccountIndexed(accountId.get());
verify(forwarder, never()).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
}
@Test
public void shouldReindexInRemoteWhenContextIsMissingButForcedIndexingEnabled() throws Exception {
ThreadLocalRequestContext threadLocalCtxMock = mock(ThreadLocalRequestContext.class);
OneOffRequestContext oneOffCtxMock = mock(OneOffRequestContext.class);
Configuration cfgMock = mock(Configuration.class);
Configuration.Index cfgIndex = mock(Configuration.Index.class);
when(cfgMock.index()).thenReturn(cfgIndex);
when(cfgIndex.synchronizeForced()).thenReturn(true);
setUpIndexEventHandler(new CurrentRequestContext(threadLocalCtxMock, cfgMock, oneOffCtxMock));
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(forwarder).indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any());
}
@Test
public void shouldIndexInRemoteOnAccountIndexedEvent() throws Exception {
indexEventHandler.onAccountIndexed(accountId.get());
verify(forwarder).indexAccount(eq(ACCOUNT_ID), any());
}
@Test
public void shouldDeleteFromIndexInRemoteOnChangeDeletedEvent() throws Exception {
indexEventHandler.onChangeDeleted(changeId.get());
verify(forwarder).deleteChangeFromIndex(eq(CHANGE_ID), any());
verifyZeroInteractions(
changeCheckerMock); // Deleted changes should not be checked against NoteDb
}
@Test
public void shouldIndexInRemoteOnGroupIndexedEvent() throws Exception {
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(forwarder).indexGroup(eq(UUID), any());
}
@Test
public void shouldNotCallRemoteWhenChangeEventIsForwarded() throws Exception {
Context.setForwardedEvent(true);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onChangeDeleted(changeId.get());
Context.unsetForwardedEvent();
verifyZeroInteractions(forwarder);
}
@Test
public void shouldNotCallRemoteWhenAccountEventIsForwarded() throws Exception {
Context.setForwardedEvent(true);
indexEventHandler.onAccountIndexed(accountId.get());
indexEventHandler.onAccountIndexed(accountId.get());
Context.unsetForwardedEvent();
verifyZeroInteractions(forwarder);
}
@Test
public void shouldNotCallRemoteWhenGroupEventIsForwarded() throws Exception {
Context.setForwardedEvent(true);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
Context.unsetForwardedEvent();
verifyZeroInteractions(forwarder);
}
@Test
public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
currCtx,
configuration,
idLocks);
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
verify(poolMock, times(1))
.execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null));
}
@Test
public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
currCtx,
configuration,
idLocks);
indexEventHandler.onAccountIndexed(accountId.get());
indexEventHandler.onAccountIndexed(accountId.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
}
@Test
public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
currCtx,
configuration,
idLocks);
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
indexEventHandler.onGroupIndexed(accountGroupUUID.get());
verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
}
@Test
public void testIndexChangeTaskToString() throws Exception {
IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
assertThat(task.toString())
.isEqualTo(
String.format("[%s] Index change %s in target instance", PLUGIN_NAME, CHANGE_ID));
}
@Test
public void testIndexAccountTaskToString() throws Exception {
IndexAccountTask task = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
assertThat(task.toString())
.isEqualTo(
String.format("[%s] Index account %s in target instance", PLUGIN_NAME, ACCOUNT_ID));
}
@Test
public void testIndexGroupTaskToString() throws Exception {
IndexGroupTask task = indexEventHandler.new IndexGroupTask(UUID);
assertThat(task.toString())
.isEqualTo(String.format("[%s] Index group %s in target instance", PLUGIN_NAME, UUID));
}
@Test
public void testIndexChangeTaskHashCodeAndEquals() {
IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
IndexChangeTask sameTask = task;
assertThat(task.equals(sameTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
IndexChangeTask identicalTask =
indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, null);
assertThat(task.equals(identicalTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
assertThat(task.equals(null)).isFalse();
assertThat(
task.equals(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, null)))
.isFalse();
assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
IndexChangeTask differentChangeIdTask =
indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, null);
assertThat(task.equals(differentChangeIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
}
@Test
public void testDeleteChangeTaskHashCodeAndEquals() {
DeleteChangeTask task = indexEventHandler.new DeleteChangeTask(CHANGE_ID, null);
DeleteChangeTask sameTask = task;
assertThat(task.equals(sameTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
DeleteChangeTask identicalTask = indexEventHandler.new DeleteChangeTask(CHANGE_ID, null);
assertThat(task.equals(identicalTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
assertThat(task.equals(null)).isFalse();
assertThat(task.equals(indexEventHandler.new DeleteChangeTask(CHANGE_ID + 1, null))).isFalse();
assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
DeleteChangeTask differentChangeIdTask = indexEventHandler.new DeleteChangeTask(123, null);
assertThat(task.equals(differentChangeIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
}
@Test
public void testIndexAccountTaskHashCodeAndEquals() {
IndexAccountTask task = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
IndexAccountTask sameTask = task;
assertThat(task.equals(sameTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
IndexAccountTask identicalTask = indexEventHandler.new IndexAccountTask(ACCOUNT_ID);
assertThat(task.equals(identicalTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
assertThat(task.equals(null)).isFalse();
assertThat(task.equals(indexEventHandler.new IndexAccountTask(ACCOUNT_ID + 1))).isFalse();
assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
IndexAccountTask differentAccountIdTask = indexEventHandler.new IndexAccountTask(123);
assertThat(task.equals(differentAccountIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentAccountIdTask.hashCode());
}
@Test
public void testIndexGroupTaskHashCodeAndEquals() {
IndexGroupTask task = indexEventHandler.new IndexGroupTask(UUID);
IndexGroupTask sameTask = task;
assertThat(task.equals(sameTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
IndexGroupTask identicalTask = indexEventHandler.new IndexGroupTask(UUID);
assertThat(task.equals(identicalTask)).isTrue();
assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
assertThat(task.equals(null)).isFalse();
assertThat(task.equals(indexEventHandler.new IndexGroupTask(OTHER_UUID))).isFalse();
assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
IndexGroupTask differentGroupIdTask = indexEventHandler.new IndexGroupTask("123");
assertThat(task.equals(differentGroupIdTask)).isFalse();
assertThat(task.hashCode()).isNotEqualTo(differentGroupIdTask.hashCode());
}
class TestTask<T> implements Runnable {
private IndexTask task;
private CyclicBarrier testBarrier;
private Supplier<T> successFunc;
private VoidFunction failureFunc;
private CompletableFuture<T> future;
public TestTask(
IndexTask task,
CyclicBarrier testBarrier,
Supplier<T> successFunc,
VoidFunction failureFunc) {
this.task = task;
this.testBarrier = testBarrier;
this.successFunc = successFunc;
this.failureFunc = failureFunc;
this.future = new CompletableFuture<>();
}
@SuppressWarnings("unchecked")
@Override
public void run() {
try {
testBarrier.await();
idLocks
.withLock(
task,
() ->
runLater(
INDEX_WAIT_TIMEOUT_MS * 2,
() -> CompletableFuture.completedFuture(successFunc.get())),
failureFunc)
.whenComplete(
(v, t) -> {
if (t == null) {
future.complete((T) v);
} else {
future.completeExceptionally(t);
}
});
} catch (Throwable t) {
future = new CompletableFuture<>();
future.completeExceptionally(t);
}
}
public void join() {
try {
future.join();
} catch (Exception e) {
}
}
private CompletableFuture<T> runLater(
long scheduledTimeMsec, Supplier<CompletableFuture<T>> supplier) {
CompletableFuture<T> resFuture = new CompletableFuture<>();
testExecutor.schedule(
() -> {
try {
return supplier
.get()
.whenComplete(
(v, t) -> {
if (t == null) {
resFuture.complete(v);
}
resFuture.completeExceptionally(t);
});
} catch (Throwable t) {
return resFuture.completeExceptionally(t);
}
},
scheduledTimeMsec,
TimeUnit.MILLISECONDS);
return resFuture;
}
}
@Test
public void indexLocksShouldBlockConcurrentIndexChange() throws Exception {
IndexChangeTask indexTask1 =
indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
IndexChangeTask indexTask2 =
indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
testIsolationOfCuncurrentIndexTasks(indexTask1, indexTask2);
}
@Test
public void indexLocksShouldBlockConcurrentIndexAndDeleteChange() throws Exception {
IndexChangeTask indexTask =
indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, new IndexEvent());
DeleteChangeTask deleteTask =
indexEventHandler.new DeleteChangeTask(CHANGE_ID, new IndexEvent());
testIsolationOfCuncurrentIndexTasks(indexTask, deleteTask);
}
private void testIsolationOfCuncurrentIndexTasks(IndexTask indexTask1, IndexTask indexTask2)
throws Exception {
AtomicInteger changeIndexedCount = new AtomicInteger();
AtomicInteger lockFailedCounts = new AtomicInteger();
CyclicBarrier changeThreadsSync = new CyclicBarrier(2);
TestTask<Integer> task1 =
new TestTask<>(
indexTask1,
changeThreadsSync,
() -> changeIndexedCount.incrementAndGet(),
() -> lockFailedCounts.incrementAndGet());
TestTask<Integer> task2 =
new TestTask<>(
indexTask2,
changeThreadsSync,
() -> changeIndexedCount.incrementAndGet(),
() -> lockFailedCounts.incrementAndGet());
new Thread(task1).start();
new Thread(task2).start();
task1.join();
task2.join();
/* Both assertions needs to be true, the order doesn't really matter:
* - Only one of the two tasks should succeed
* - Only one of the two tasks should fail to acquire the lock
*/
assertThat(changeIndexedCount.get()).isEqualTo(1);
assertThat(lockFailedCounts.get()).isEqualTo(1);
}
private class CurrentThreadScheduledExecutorService implements ScheduledExecutorService {
@Override
public void shutdown() {}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return null;
}
@Override
public Future<?> submit(Runnable task) {
return null;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return null;
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
command.run();
return null;
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return null;
}
}
}