Merge branch 'stable-3.6' into stable-3.7 * stable-3.6: Specify cache.threads = 0 in the multi-site config documentation Change-Id: Ia18c481aeba170f5127877abd875f012bf50bc1b
diff --git a/docker-compose.kafka-broker.yaml b/docker-compose.kafka-broker.yaml deleted file mode 100644 index d3fc713..0000000 --- a/docker-compose.kafka-broker.yaml +++ /dev/null
@@ -1,14 +0,0 @@ -version: '3' -services: - zookeeper: - image: wurstmeister/zookeeper:latest - ports: - - "2181:2181" - kafka: - image: wurstmeister/kafka:2.13-2.6.3 - ports: - - "9092:9092" - environment: - KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - KAFKA_CREATE_TOPICS: "gerrit:1:1" - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
diff --git a/e2e-tests/docker-compose-kafka.yaml b/e2e-tests/docker-compose-kafka.yaml index addb8d5..b02b58d 100644 --- a/e2e-tests/docker-compose-kafka.yaml +++ b/e2e-tests/docker-compose-kafka.yaml
@@ -1,12 +1,11 @@ version: '3' services: kafka: - image: wurstmeister/kafka:2.13-2.6.3 + image: bitnami/kafka:3.6.0 depends_on: - zookeeper environment: - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 gerrit1: depends_on:
diff --git a/e2e-tests/docker-compose.yaml b/e2e-tests/docker-compose.yaml index 3abb757..2ac7b06 100644 --- a/e2e-tests/docker-compose.yaml +++ b/e2e-tests/docker-compose.yaml
@@ -1,7 +1,9 @@ version: '3' services: zookeeper: - image: wurstmeister/zookeeper:latest + image: bitnami/zookeeper:3.8.3 + environment: + ALLOW_ANONYMOUS_LOGIN: "true" gerrit1: image: gerritcodereview/gerrit:${GERRIT_IMAGE}
diff --git a/e2e-tests/test.sh b/e2e-tests/test.sh index b069565..58029ba 100755 --- a/e2e-tests/test.sh +++ b/e2e-tests/test.sh
@@ -16,11 +16,11 @@ LOCATION="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" LOCAL_ENV="$( cd "${LOCATION}/../setup_local_env" >/dev/null 2>&1 && pwd )" -GERRIT_BRANCH=stable-3.6 +GERRIT_BRANCH=stable-3.7 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins DEF_MULTISITE_LOCATION=${LOCATION}/../../../bazel-bin/plugins/multi-site/multi-site.jar -DEF_GERRIT_IMAGE=3.4.0-centos8 +DEF_GERRIT_IMAGE=3.7.4 DEF_GERRIT_HEALTHCHECK_START_PERIOD=60s DEF_GERRIT_HEALTHCHECK_INTERVAL=5s DEF_GERRIT_HEALTHCHECK_TIMEOUT=5s
diff --git a/setup_local_env/docker-compose-core.yaml b/setup_local_env/docker-compose-core.yaml index dab168d..f8ef318 100644 --- a/setup_local_env/docker-compose-core.yaml +++ b/setup_local_env/docker-compose-core.yaml
@@ -1,10 +1,12 @@ version: '3' services: zookeeper: - image: wurstmeister/zookeeper:latest + image: bitnami/zookeeper:3.8.3 ports: - "2181:2181" container_name: zk_test_node + environment: + ALLOW_ANONYMOUS_LOGIN: "true" prometheus: container_name: prometheus_test_node image: prom/prometheus:v2.16.0
diff --git a/setup_local_env/docker-compose-kafka.yaml b/setup_local_env/docker-compose-kafka.yaml index 2a5b0fc..70fda58 100644 --- a/setup_local_env/docker-compose-kafka.yaml +++ b/setup_local_env/docker-compose-kafka.yaml
@@ -1,13 +1,13 @@ version: '3' services: kafka: - image: wurstmeister/kafka:2.13-2.6.3 + image: bitnami/kafka:3.6.0 ports: - "9092:9092" container_name: kafka_test_node environment: - KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 networks: - setup_local_env_default networks:
diff --git a/setup_local_env/setup.sh b/setup_local_env/setup.sh index 7697f24..d5ded8c 100755 --- a/setup_local_env/setup.sh +++ b/setup_local_env/setup.sh
@@ -16,7 +16,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -GERRIT_BRANCH=stable-3.6 +GERRIT_BRANCH=stable-3.7 GERRIT_CI=https://gerrit-ci.gerritforge.com/view/Plugins-$GERRIT_BRANCH/job LAST_BUILD=lastSuccessfulBuild/artifact/bazel-bin/plugins
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java index 522a78f..15f7c13 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandler.java
@@ -56,34 +56,49 @@ @Override protected void doIndex(String id, Optional<ChangeIndexEvent> indexEvent) { - attemptToIndex(id, indexEvent, 0); + scheduleIndexing(id, indexEvent, this::indexIfConsistent); + } + + private void indexIfConsistent(String id) { + if (isChangeConsistent(id)) { + reindex(id); + } + } + + private boolean isChangeConsistent(String id) { + ChangeChecker checker = changeCheckerFactory.create(id); + Optional<ChangeNotes> changeNotes = checker.getChangeNotes(); + return changeNotes.isPresent() && checker.isChangeConsistent(); } @Override - protected void attemptToIndex(String id, Optional<ChangeIndexEvent> indexEvent, int retryCount) { + protected void attemptToIndex(String id) { ChangeChecker checker = changeCheckerFactory.create(id); Optional<ChangeNotes> changeNotes = checker.getChangeNotes(); boolean changeIsPresent = changeNotes.isPresent(); boolean changeIsConsistent = checker.isChangeConsistent(); if (changeIsPresent && changeIsConsistent) { - reindexAndCheckIsUpToDate(id, indexEvent, checker, retryCount); + reindexAndCheckIsUpToDate(id, checker); } else { + IndexingRetry retry = indexingRetryTaskMap.get(id); log.warn( "Change {} {} in local Git repository (event={}) after {} attempt(s)", id, !changeIsPresent ? "not present yet" : (changeIsConsistent ? "is" : "is not") + " consistent", - indexEvent, - retryCount); - if (!rescheduleIndex(id, indexEvent, retryCount + 1)) { + retry.getEvent(), + retry.getRetryNumber()); + + retry.incrementRetryNumber(); + if (!rescheduleIndex(id)) { log.error( "Change {} {} in the local Git repository (event={})", id, !changeIsPresent ? "could not be found" : (changeIsConsistent ? "was" : "was not") + " consistent", - indexEvent); + retry.getEvent()); } } } @@ -95,7 +110,7 @@ Optional<ChangeNotes> changeNotes = checker.getChangeNotes(); ChangeNotes notes = changeNotes.get(); notes.reload(); - indexer.index(notes.getChange()); + indexer.index(notes); } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java index c4906a9..d05405f 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexGroupHandler.java
@@ -52,7 +52,7 @@ @Override protected void doIndex(String uuid, Optional<GroupIndexEvent> event) { - attemptToIndex(uuid, event, 0); + scheduleIndexing(uuid, event, this::reindex); } @Override @@ -66,9 +66,8 @@ } @Override - protected void attemptToIndex( - String uuid, Optional<GroupIndexEvent> groupIndexEvent, int retryCount) { - reindexAndCheckIsUpToDate(uuid, groupIndexEvent, groupChecker, retryCount); + protected void attemptToIndex(String uuid) { + reindexAndCheckIsUpToDate(uuid, groupChecker); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java index 3787a80..2054f10 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexProjectHandler.java
@@ -52,7 +52,7 @@ @Override protected void doIndex(String projectName, Optional<ProjectIndexEvent> event) { - attemptToIndex(projectName, event, 0); + scheduleIndexing(projectName, event, this::reindex); } @Override @@ -66,8 +66,8 @@ } @Override - protected void attemptToIndex(String id, Optional<ProjectIndexEvent> indexEvent, int retryCount) { - reindexAndCheckIsUpToDate(id, indexEvent, projectChecker, retryCount); + protected void attemptToIndex(String id) { + reindexAndCheckIsUpToDate(id, projectChecker); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java index 5c64431..f3d4e70 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java +++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexingHandlerWithRetries.java
@@ -19,10 +19,13 @@ import com.googlesource.gerrit.plugins.multisite.Configuration; import com.googlesource.gerrit.plugins.multisite.forwarder.events.IndexEvent; import com.googlesource.gerrit.plugins.multisite.index.UpToDateChecker; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Base class to handle forwarded indexing. This class is meant to be extended by classes used on @@ -37,6 +40,7 @@ private final int maxTries; private final ScheduledExecutorService indexExecutor; protected final OneOffRequestContext oneOffCtx; + protected final Map<T, IndexingRetry> indexingRetryTaskMap = new ConcurrentHashMap<>(); ForwardedIndexingHandlerWithRetries( ScheduledExecutorService indexExecutor, @@ -55,22 +59,34 @@ protected abstract String indexName(); - protected abstract void attemptToIndex(T id, Optional<E> indexEvent, int retryCount); + protected abstract void attemptToIndex(T id); - protected boolean rescheduleIndex(T id, Optional<E> indexEvent, int retryCount) { - if (retryCount > maxTries) { + protected boolean rescheduleIndex(T id) { + IndexingRetry retry = indexingRetryTaskMap.get(id); + if (retry == null) { + log.info( + "{} {} successfully indexed by different task, rescheduling isn't needed", + indexName(), + id); + return true; + } + if (retry.getRetryNumber() > maxTries) { log.error( "{} {} could not be indexed after {} retries. {} index could be stale.", indexName(), id, - retryCount, + retry.getRetryNumber(), indexName()); + if (!indexingRetryTaskMap.remove(id, retry)) { + log.debug( + "{} {} not removed from retry map because of racy addition of a new retry indexing retry"); + } return false; } log.warn( "Retrying for the #{} time to index {} {} after {} msecs", - retryCount, + retry.getRetryNumber(), indexName(), id, retryInterval); @@ -80,7 +96,7 @@ () -> { try (ManualRequestContext ctx = oneOffCtx.open()) { Context.setForwardedEvent(true); - attemptToIndex(id, indexEvent, retryCount); + attemptToIndex(id); } catch (Exception e) { log.warn("{} {} could not be indexed", indexName(), id, e); } @@ -90,20 +106,66 @@ return true; } - public final void reindexAndCheckIsUpToDate( - T id, Optional<E> indexEvent, UpToDateChecker<E> upToDateChecker, int retryCount) { - reindex(id); - - if (!upToDateChecker.isUpToDate(indexEvent)) { - log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id); - rescheduleIndex(id, indexEvent, retryCount + 1); + public void scheduleIndexing(T id, Optional<E> event, Consumer<T> indexOnce) { + IndexingRetry retry = new IndexingRetry(event); + if (indexingRetryTaskMap.put(id, retry) != null) { + indexOnce.accept(id); + log.info( + "Skipping indexing because there is already a running task for the specified id. Index name: {}, task id: {}", + indexName(), + id); return; } - if (retryCount > 0) { + attemptToIndex(id); + } + + public final void reindexAndCheckIsUpToDate(T id, UpToDateChecker<E> upToDateChecker) { + reindex(id); + IndexingRetry retry = indexingRetryTaskMap.get(id); + if (retry == null) { + log.warn("{} {} successfully indexed by different task", indexName(), id); + return; + } + if (!upToDateChecker.isUpToDate(retry.getEvent())) { + log.warn("{} {} is not up-to-date. Rescheduling", indexName(), id); + retry.incrementRetryNumber(); + rescheduleIndex(id); + return; + } + + if (retry.getRetryNumber() > 0) { log.warn( - "{} {} has been eventually indexed after {} attempt(s)", indexName(), id, retryCount); + "{} {} has been eventually indexed after {} attempt(s)", + indexName(), + id, + retry.getRetryNumber()); } else { log.debug("{} {} successfully indexed", indexName(), id); } + if (!indexingRetryTaskMap.remove(id, retry)) { + log.debug( + "{} {} not removed from retry map because of racy addition of a new retry indexing retry"); + } + } + + public class IndexingRetry { + private final Optional<E> event; + private int retryNumber = 0; + + public IndexingRetry(Optional<E> event) { + this.event = event; + } + + public int getRetryNumber() { + return retryNumber; + } + + public Optional<E> getEvent() { + return event; + } + + public void incrementRetryNumber() { + ++retryNumber; + } } }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java index 2c4b157..7133a0d 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -88,7 +88,6 @@ when(ctxMock.open()).thenReturn(manualRequestContextMock); id = Change.id(TEST_CHANGE_NUMBER); change = new Change(null, id, null, null, TimeUtil.now()); - when(changeNotes.getChange()).thenReturn(change); when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock); when(configurationMock.index()).thenReturn(index); when(index.numStripedLocks()).thenReturn(10); @@ -102,7 +101,7 @@ public void changeIsIndexedWhenUpToDate() throws Exception { setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE, CHANGE_CONSISTENT); handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()); - verify(indexerMock, times(1)).index(any(Change.class)); + verify(indexerMock, times(1)).index(any(ChangeNotes.class)); } @Test @@ -112,7 +111,7 @@ TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id"))); - verify(indexerMock, times(1)).index(any(Change.class)); + verify(indexerMock, times(1)).index(any(ChangeNotes.class)); } @Test @@ -127,14 +126,14 @@ TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id"))); - verify(indexerMock, never()).index(any(Change.class)); + verify(indexerMock, never()).index(any(ChangeNotes.class)); verify(indexExecutorMock, times(1)).schedule(any(Runnable.class), anyLong(), any()); handler.index( TEST_CHANGE_ID, Operation.INDEX, Optional.of(new ChangeIndexEvent("foo", 1, false, "instance-id"))); - verify(indexerMock, times(1)).index(any(Change.class)); + verify(indexerMock, times(1)).index(any(ChangeNotes.class)); } @Test @@ -172,13 +171,13 @@ return null; }) .when(indexerMock) - .index(any(Change.class)); + .index(any(ChangeNotes.class)); assertThat(Context.isForwardedEvent()).isFalse(); handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty()); assertThat(Context.isForwardedEvent()).isFalse(); - verify(indexerMock, times(1)).index(any(Change.class)); + verify(indexerMock, times(1)).index(any(ChangeNotes.class)); } @Test @@ -191,7 +190,7 @@ throw new IOException("someMessage"); }) .when(indexerMock) - .index(any(Change.class)); + .index(any(ChangeNotes.class)); assertThat(Context.isForwardedEvent()).isFalse(); IOException thrown = @@ -201,7 +200,7 @@ assertThat(thrown).hasMessageThat().isEqualTo("someMessage"); assertThat(Context.isForwardedEvent()).isFalse(); - verify(indexerMock, times(1)).index(any(Change.class)); + verify(indexerMock, times(1)).index(any(ChangeNotes.class)); } private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate) @@ -226,7 +225,7 @@ when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock); when(changeCheckerPresentMock.getChangeNotes()).thenReturn(Optional.of(changeNotes)); if (storageException) { - doThrow(new StorageException("io-error")).when(indexerMock).index(any(Change.class)); + doThrow(new StorageException("io-error")).when(indexerMock).index(any(ChangeNotes.class)); } }