Merge "e2e-tests: Reuse the now available numberKey from GerritSimulation" into stable-3.1
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index 4743bee..093628e 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -53,6 +53,7 @@
// common parameters to cache and index sections
static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
+ static final String BATCH_THREAD_POOL_SIZE_KEY = "batchThreadPoolSize";
static final int DEFAULT_INDEX_MAX_TRIES = 2;
static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
static final int DEFAULT_THREAD_POOL_SIZE = 4;
@@ -453,6 +454,7 @@
static final boolean DEFAULT_SYNCHRONIZE_FORCED = true;
private final int threadPoolSize;
+ private final int batchThreadPoolSize;
private final int retryInterval;
private final int maxTries;
private final int numStripedLocks;
@@ -461,6 +463,7 @@
private Index(Config cfg) {
super(cfg, INDEX_SECTION);
threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
+ batchThreadPoolSize = getInt(cfg, INDEX_SECTION, BATCH_THREAD_POOL_SIZE_KEY, threadPoolSize);
numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
@@ -472,6 +475,10 @@
return threadPoolSize;
}
+ public int batchThreadPoolSize() {
+ return batchThreadPoolSize;
+ }
+
public int numStripedLocks() {
return numStripedLocks;
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
index 6255986..561bb19 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/IndexTs.java
@@ -48,7 +48,10 @@
private final Path dataDir;
private final ScheduledExecutorService exec;
- private final FlusherRunner flusher;
+ private final FlusherRunner changeFlusher;
+ private final FlusherRunner accountFlusher;
+ private final FlusherRunner groupFlusher;
+ private final FlusherRunner projectFlusher;
private final ChangeFinder changeFinder;
private final CurrentRequestContext currCtx;
@@ -58,16 +61,11 @@
private volatile LocalDateTime projectTs;
class FlusherRunner implements Runnable {
+ private final AbstractIndexRestApiServlet.IndexName index;
@Override
public void run() {
- store(AbstractIndexRestApiServlet.IndexName.CHANGE, changeTs);
- store(AbstractIndexRestApiServlet.IndexName.ACCOUNT, accountTs);
- store(AbstractIndexRestApiServlet.IndexName.GROUP, groupTs);
- store(AbstractIndexRestApiServlet.IndexName.PROJECT, projectTs);
- }
-
- private void store(AbstractIndexRestApiServlet.IndexName index, LocalDateTime latestTs) {
+ LocalDateTime latestTs = getIndexTimeStamp();
Optional<LocalDateTime> currTs = getUpdateTs(index);
if (!currTs.isPresent() || latestTs.isAfter(currTs.get())) {
Path indexTsFile = dataDir.resolve(index.name().toLowerCase());
@@ -78,6 +76,25 @@
}
}
}
+
+ FlusherRunner(AbstractIndexRestApiServlet.IndexName index) {
+ this.index = index;
+ }
+
+ private LocalDateTime getIndexTimeStamp() {
+ switch (index) {
+ case CHANGE:
+ return changeTs;
+ case GROUP:
+ return groupTs;
+ case ACCOUNT:
+ return accountTs;
+ case PROJECT:
+ return projectTs;
+ default:
+ throw new IllegalArgumentException("Unsupported index " + index);
+ }
+ }
}
@Inject
@@ -88,7 +105,10 @@
CurrentRequestContext currCtx) {
this.dataDir = dataDir;
this.exec = queue.getDefaultQueue();
- this.flusher = new FlusherRunner();
+ this.changeFlusher = new FlusherRunner(AbstractIndexRestApiServlet.IndexName.CHANGE);
+ this.accountFlusher = new FlusherRunner(AbstractIndexRestApiServlet.IndexName.ACCOUNT);
+ this.groupFlusher = new FlusherRunner(AbstractIndexRestApiServlet.IndexName.GROUP);
+ this.projectFlusher = new FlusherRunner(AbstractIndexRestApiServlet.IndexName.PROJECT);
this.changeFinder = changeFinder;
this.currCtx = currCtx;
}
@@ -147,19 +167,22 @@
switch (index) {
case CHANGE:
changeTs = dateTime;
+ exec.execute(changeFlusher);
break;
case ACCOUNT:
accountTs = dateTime;
+ exec.execute(accountFlusher);
break;
case GROUP:
groupTs = dateTime;
+ exec.execute(groupFlusher);
break;
case PROJECT:
projectTs = dateTime;
+ exec.execute(projectFlusher);
break;
default:
throw new IllegalArgumentException("Unsupported index " + index);
}
- exec.execute(flusher);
}
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java
new file mode 100644
index 0000000..dee8876
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexBatchChangeHandler.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl.Factory;
+import com.ericsson.gerrit.plugins.highavailability.index.ForwardedBatchIndexExecutor;
+import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.util.concurrent.ScheduledExecutorService;
+
+@Singleton
+public class ForwardedIndexBatchChangeHandler extends ForwardedIndexChangeHandler {
+
+ @Inject
+ ForwardedIndexBatchChangeHandler(
+ ChangeIndexer indexer,
+ Configuration config,
+ @ForwardedBatchIndexExecutor ScheduledExecutorService indexExecutor,
+ OneOffRequestContext oneOffCtx,
+ Factory changeCheckerFactory) {
+ super(indexer, config, indexExecutor, oneOffCtx, changeCheckerFactory);
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
index bb47f11..b73b676 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
@@ -42,6 +42,18 @@
CompletableFuture<Boolean> indexChange(String projectName, int changeId, IndexEvent indexEvent);
/**
+ * Forward a change indexing event to the other master using batch index endpoint.
+ *
+ * @param projectName the project of the change to index.
+ * @param changeId the change to index.
+ * @param indexEvent the details of the index event.
+ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of
+ * false.
+ */
+ CompletableFuture<Boolean> batchIndexChange(
+ String projectName, int changeId, IndexEvent indexEvent);
+
+ /**
* Forward a delete change from index event to the other master.
*
* @param changeId the change to remove from the index.
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
index 037c1c6..71cf044 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
@@ -21,10 +21,14 @@
public class IndexEvent {
public long eventCreatedOn = System.currentTimeMillis() / 1000;
public String targetSha;
+ public String metaSha;
@Override
public String toString() {
- return "IndexEvent@" + format(eventCreatedOn) + ((targetSha != null) ? "/" + targetSha : "");
+ return "IndexEvent@"
+ + format(eventCreatedOn)
+ + ((targetSha != null) ? "/target:" + targetSha : "")
+ + ((metaSha != null) ? "/meta:" + metaSha : "");
}
public static String format(long eventTs) {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java
new file mode 100644
index 0000000..bebfae9
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java
@@ -0,0 +1,37 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.forwarder.rest;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexBatchChangeHandler;
+import com.google.gerrit.extensions.restapi.Url;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class IndexBatchChangeRestApiServlet extends AbstractIndexRestApiServlet<String> {
+ private static final long serialVersionUID = -1L;
+
+ @Inject
+ IndexBatchChangeRestApiServlet(ForwardedIndexBatchChangeHandler handler, @EventGson Gson gson) {
+ super(handler, IndexName.CHANGE, true, gson);
+ }
+
+ @Override
+ String parse(String id) {
+ return Url.decode(id);
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
index d404ab2..ea4ad10 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -83,6 +83,16 @@
event);
}
+ public CompletableFuture<Boolean> batchIndexChange(
+ String projectName, int changeId, IndexEvent event) {
+ return execute(
+ RequestMethod.POST,
+ "index change",
+ "index/change/batch",
+ buildIndexEndpoint(projectName, changeId),
+ event);
+ }
+
@Override
public CompletableFuture<Boolean> deleteChangeFromIndex(final int changeId, IndexEvent event) {
return execute(
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderServletModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderServletModule.java
index 589bbef..4d3de37 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderServletModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderServletModule.java
@@ -20,6 +20,7 @@
@Override
protected void configureServlets() {
serveRegex("/index/account/\\d+$").with(IndexAccountRestApiServlet.class);
+ serveRegex("/index/change/batch/.*$").with(IndexBatchChangeRestApiServlet.class);
serveRegex("/index/change/.*$").with(IndexChangeRestApiServlet.class);
serveRegex("/index/group/\\w+$").with(IndexGroupRestApiServlet.class);
serveRegex("/index/project/.*$").with(IndexProjectRestApiServlet.class);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java
new file mode 100644
index 0000000..06da9f0
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+@interface BatchIndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java
new file mode 100644
index 0000000..cfbd4fb
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/BatchIndexExecutorProvider.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class BatchIndexExecutorProvider extends ExecutorProvider {
+
+ @Inject
+ BatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
+ super(workQueue, config.index().batchThreadPoolSize(), "Forward-BatchIndex-Event");
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
index 4a0cad7..23712be 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
@@ -18,6 +18,7 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Comment;
+import com.google.gerrit.entities.RefNames;
import com.google.gerrit.server.CommentsUtil;
import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.git.GitRepositoryManager;
@@ -62,15 +63,25 @@
}
@Override
- public Optional<IndexEvent> newIndexEvent() {
- return getComputedChangeTs()
- .map(
- ts -> {
- IndexEvent event = new IndexEvent();
- event.eventCreatedOn = ts;
- event.targetSha = getBranchTargetSha();
- return event;
- });
+ public Optional<IndexEvent> newIndexEvent() throws IOException {
+ Optional<Long> changeTs = getComputedChangeTs();
+ if (!changeTs.isPresent()) {
+ return Optional.empty();
+ }
+
+ long ts = changeTs.get();
+
+ IndexEvent event = new IndexEvent();
+ event.eventCreatedOn = ts;
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) {
+ event.targetSha = getBranchTargetSha();
+ event.metaSha = getMetaSha(repo);
+ return Optional.of(event);
+ } catch (IOException e) {
+ log.atSevere().withCause(e).log(
+ "Unable to create index event for project %s", changeNotes.get().getProjectName());
+ throw e;
+ }
}
@Override
@@ -82,25 +93,31 @@
}
@Override
- public boolean isChangeUpToDate(Optional<IndexEvent> indexEvent) {
+ public boolean isChangeUpToDate(Optional<IndexEvent> indexEventOption) throws IOException {
getComputedChangeTs();
- log.atFine().log("Checking change %s against index event %s", this, indexEvent);
+ log.atFine().log("Checking change %s against index event %s", this, indexEventOption);
if (!computedChangeTs.isPresent()) {
log.atWarning().log("Unable to compute last updated ts for change %s", changeId);
return false;
}
+ try {
+ if (indexEventOption.isPresent()) {
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) {
+ IndexEvent indexEvent = indexEventOption.get();
+ return (computedChangeTs.get() > indexEvent.eventCreatedOn)
+ || (computedChangeTs.get() == indexEvent.eventCreatedOn)
+ && (Objects.isNull(indexEvent.targetSha)
+ || Objects.equals(getBranchTargetSha(), indexEvent.targetSha))
+ && (Objects.isNull(indexEvent.metaSha)
+ || Objects.equals(getMetaSha(repo), indexEvent.metaSha));
+ }
+ }
+ return true;
- if (indexEvent.isPresent() && indexEvent.get().targetSha == null) {
- return indexEvent.map(e -> (computedChangeTs.get() >= e.eventCreatedOn)).orElse(true);
+ } catch (IOException ex) {
+ log.atWarning().log("Unable to read meta sha for change %s", changeId);
+ return false;
}
-
- return indexEvent
- .map(
- e ->
- (computedChangeTs.get() > e.eventCreatedOn)
- || (computedChangeTs.get() == e.eventCreatedOn)
- && (Objects.equals(getBranchTargetSha(), e.targetSha)))
- .orElse(true);
}
@Override
@@ -113,12 +130,19 @@
@Override
public String toString() {
- return "change-id="
- + changeId
- + "@"
- + getComputedChangeTs().map(IndexEvent::format)
- + "/"
- + getBranchTargetSha();
+ try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) {
+ return "change-id="
+ + changeId
+ + "@"
+ + getComputedChangeTs().map(IndexEvent::format)
+ + "/target:"
+ + getBranchTargetSha()
+ + "/meta:"
+ + getMetaSha(repo);
+ } catch (IOException e) {
+ log.atSevere().withCause(e).log("Unable to render change %s", changeId);
+ return "change-id=" + changeId;
+ }
}
private String getBranchTargetSha() {
@@ -141,6 +165,16 @@
return getChangeNotes().map(this::getTsFromChangeAndDraftComments);
}
+ private String getMetaSha(Repository repo) throws IOException {
+ String refName = RefNames.changeMetaRef(changeNotes.get().getChange().getId());
+ Ref ref = repo.exactRef(refName);
+ if (ref == null) {
+ throw new IOException(
+ String.format("Unable to find meta ref %s for change %s", refName, changeId));
+ }
+ return ref.getTarget().getObjectId().getName();
+ }
+
private long getTsFromChangeAndDraftComments(ChangeNotes notes) {
Change change = notes.getChange();
Timestamp changeTs = change.getLastUpdatedOn();
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutor.java
new file mode 100644
index 0000000..b5b6aab
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ForwardedBatchIndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
new file mode 100644
index 0000000..f5bc85e
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedBatchIndexExecutorProvider.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class ForwardedBatchIndexExecutorProvider extends ExecutorProvider {
+
+ @Inject
+ ForwardedBatchIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
+ super(workQueue, config.index().batchThreadPoolSize(), "Forwarded-BatchIndex-Event");
+ }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 368581c..73e842f 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -40,6 +40,7 @@
ProjectIndexedListener {
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private final ScheduledExecutorService executor;
+ private final ScheduledExecutorService batchExecutor;
private final Forwarder forwarder;
private final String pluginName;
private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -53,6 +54,7 @@
@Inject
IndexEventHandler(
@IndexExecutor ScheduledExecutorService executor,
+ @BatchIndexExecutor ScheduledExecutorService batchExecutor,
@PluginName String pluginName,
Forwarder forwarder,
ChangeCheckerImpl.Factory changeChecker,
@@ -61,6 +63,7 @@
IndexEventLocks locks) {
this.forwarder = forwarder;
this.executor = executor;
+ this.batchExecutor = batchExecutor;
this.pluginName = pluginName;
this.changeChecker = changeChecker;
this.currCtx = currCtx;
@@ -95,11 +98,21 @@
changeChecker
.create(changeId)
.newIndexEvent()
- .map(event -> new IndexChangeTask(projectName, id, event))
+ .map(
+ event -> {
+ if (Thread.currentThread().getName().contains("Batch")) {
+ return new BatchIndexChangeTask(projectName, id, event);
+ }
+ return new IndexChangeTask(projectName, id, event);
+ })
.ifPresent(
task -> {
if (queuedTasks.add(task)) {
- executor.execute(task);
+ if (task instanceof BatchIndexChangeTask) {
+ batchExecutor.execute(task);
+ } else {
+ executor.execute(task);
+ }
}
});
} catch (Exception e) {
@@ -215,6 +228,46 @@
}
}
+ class BatchIndexChangeTask extends IndexTask {
+ private final int changeId;
+ private final String projectName;
+
+ BatchIndexChangeTask(String projectName, int changeId, IndexEvent indexEvent) {
+ super(indexEvent);
+ this.projectName = projectName;
+ this.changeId = changeId;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> execute() {
+ return forwarder.batchIndexChange(projectName, changeId, indexEvent);
+ }
+
+ @Override
+ String indexId() {
+ return "change/" + changeId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(IndexEventHandler.BatchIndexChangeTask.class, changeId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof IndexEventHandler.BatchIndexChangeTask)) {
+ return false;
+ }
+ IndexEventHandler.BatchIndexChangeTask other = (IndexEventHandler.BatchIndexChangeTask) obj;
+ return changeId == other.changeId;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("[%s] Index change %s in target instance", pluginName, changeId);
+ }
+ }
+
class DeleteChangeTask extends IndexTask {
private final int changeId;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index 4a25fc8..3bcc34f 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -35,6 +35,12 @@
.annotatedWith(ForwardedIndexExecutor.class)
.toProvider(ForwardedIndexExecutorProvider.class);
bind(IndexEventLocks.class).in(Scopes.SINGLETON);
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(BatchIndexExecutor.class)
+ .toProvider(BatchIndexExecutorProvider.class);
+ bind(ScheduledExecutorService.class)
+ .annotatedWith(ForwardedBatchIndexExecutor.class)
+ .toProvider(ForwardedBatchIndexExecutorProvider.class);
listener().to(IndexExecutorProvider.class);
DynamicSet.bind(binder(), ChangeIndexedListener.class)
.to(IndexEventHandler.class)
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 482d301..5186bd1 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -200,6 +200,11 @@
: Maximum number of threads used to send index events to the target instance.
Defaults to 4.
+```index.batchThreadPoolSize```
+: Maximum number of threads used to send batch index events to the target instance
+ and not associated to an interactive action performed by a user.
+ Defaults equal index.threadPoolSize.
+
```index.maxTries```
: Maximum number of times the plugin should attempt to reindex changes.
Setting this value to 0 will disable retries. After this number of failed tries,
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
index a9537bd..3444625 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/ConfigurationTest.java
@@ -14,6 +14,7 @@
package com.ericsson.gerrit.plugins.highavailability;
+import static com.ericsson.gerrit.plugins.highavailability.Configuration.BATCH_THREAD_POOL_SIZE_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.CACHE_SECTION;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.Cache.PATTERN_KEY;
import static com.ericsson.gerrit.plugins.highavailability.Configuration.DEFAULT_NUM_STRIPED_LOCKS;
@@ -266,6 +267,19 @@
}
@Test
+ public void testGetBatchIndexThreadPoolSize() throws Exception {
+ assertThat(getConfiguration().index().batchThreadPoolSize())
+ .isEqualTo(DEFAULT_THREAD_POOL_SIZE);
+
+ globalPluginConfig.setInt(INDEX_SECTION, null, BATCH_THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+ assertThat(getConfiguration().index().batchThreadPoolSize()).isEqualTo(THREAD_POOL_SIZE);
+
+ globalPluginConfig.setString(INDEX_SECTION, null, BATCH_THREAD_POOL_SIZE_KEY, INVALID_INT);
+ assertThat(getConfiguration().index().batchThreadPoolSize())
+ .isEqualTo(DEFAULT_THREAD_POOL_SIZE);
+ }
+
+ @Test
public void testGetIndexSynchronize() throws Exception {
assertThat(getConfiguration().index().synchronize()).isEqualTo(DEFAULT_SYNCHRONIZE);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
index 046d6ad..7908669 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -68,6 +68,14 @@
PLUGIN_NAME,
"index/change",
PROJECT_NAME_URL_END + "~" + CHANGE_NUMBER);
+ private static final String INDEX_BATCH_CHANGE_ENDPOINT =
+ Joiner.on("/")
+ .join(
+ URL,
+ PLUGINS,
+ PLUGIN_NAME,
+ "index/change/batch",
+ PROJECT_NAME_URL_END + "~" + CHANGE_NUMBER);
private static final String DELETE_CHANGE_ENDPOINT =
Joiner.on("/").join(URL, PLUGINS, PLUGIN_NAME, "index/change", "~" + CHANGE_NUMBER);
private static final int ACCOUNT_NUMBER = 2;
@@ -198,6 +206,29 @@
}
@Test
+ public void testIndexBatchChangeOK() throws Exception {
+ when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any()))
+ .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
+ assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get())
+ .isTrue();
+ }
+
+ @Test
+ public void testIndexBatchChangeFailed() throws Exception {
+ when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any()))
+ .thenReturn(new HttpResult(FAILED, EMPTY_MSG));
+ assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get())
+ .isFalse();
+ }
+
+ @Test
+ public void testIndexBatchChangeThrowsException() throws Exception {
+ doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any());
+ assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get())
+ .isFalse();
+ }
+
+ @Test
public void testChangeDeletedFromIndexOK() throws Exception {
when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT)))
.thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG));
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerIT.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerIT.java
new file mode 100644
index 0000000..febf8f0
--- /dev/null
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerIT.java
@@ -0,0 +1,127 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.entities.RefNames;
+import java.io.IOException;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.junit.Test;
+
+@TestPlugin(
+ name = "high-availability",
+ sysModule = "com.ericsson.gerrit.plugins.highavailability.Module",
+ httpModule = "com.ericsson.gerrit.plugins.highavailability.HttpModule")
+public class ChangeCheckerIT extends LightweightPluginDaemonTest {
+
+ ChangeCheckerImpl.Factory changeCheckerFactory;
+
+ @Override
+ public void setUpTestPlugin() throws Exception {
+ super.setUpTestPlugin();
+ changeCheckerFactory = plugin.getSysInjector().getInstance(ChangeCheckerImpl.Factory.class);
+ }
+
+ @Test
+ public void shouldPopulateMetaSha() throws Exception {
+ Result change = createChange();
+ ChangeChecker changeChecker = changeCheckerFactory.create(change.getChangeId());
+ Optional<IndexEvent> eventOption = changeChecker.newIndexEvent();
+
+ assertThat(eventOption.isPresent()).isTrue();
+ IndexEvent event = eventOption.get();
+ assertThat(event.metaSha).isNotNull();
+ assertThat(event.metaSha).isEqualTo(readMetaSha(change));
+ }
+
+ @Test
+ public void shouldReturnIsUpToDateTrueWhenEventContainsCorrectMetaAndTargetSha()
+ throws Exception {
+ Result change = createChange();
+ ChangeChecker changeChecker = changeCheckerFactory.create(change.getChangeId());
+ Optional<IndexEvent> event = changeChecker.newIndexEvent();
+
+ assertThat(changeChecker.isChangeUpToDate(event)).isTrue();
+ }
+
+ @Test
+ public void shouldReturnIsUpToDateTrueWhenTargetShaIsNull() throws Exception {
+ Result change = createChange();
+ ChangeChecker changeChecker = changeCheckerFactory.create(change.getChangeId());
+ Optional<IndexEvent> event =
+ changeChecker
+ .newIndexEvent()
+ .map(
+ e -> {
+ e.targetSha = null;
+ return e;
+ });
+
+ assertThat(changeChecker.isChangeUpToDate(event)).isTrue();
+ }
+
+ @Test
+ public void shouldReturnFalseWhenMetaShaIsNotUpToDate() throws Exception {
+ String testMetaRefSha = "6212efebe6e8b9f439a8ad013243e602afab7441";
+ Result change = createChange();
+ ChangeChecker changeChecker = changeCheckerFactory.create(change.getChangeId());
+ Optional<IndexEvent> event =
+ changeChecker
+ .newIndexEvent()
+ .map(
+ e -> {
+ e.metaSha = testMetaRefSha;
+ return e;
+ });
+
+ assertThat(changeChecker.isChangeUpToDate(event)).isFalse();
+ }
+
+ @Test
+ public void shouldReturnFalseWhenTargetShaIsNotUpToDate() throws Exception {
+ String testTargetRefSha = "abed47baf2818a86b68cf712073a748a6b5b293e";
+ Result change = createChange();
+ ChangeChecker changeChecker = changeCheckerFactory.create(change.getChangeId());
+ Optional<IndexEvent> event =
+ changeChecker
+ .newIndexEvent()
+ .map(
+ e -> {
+ e.targetSha = testTargetRefSha;
+ return e;
+ });
+
+ assertThat(changeChecker.isChangeUpToDate(event)).isFalse();
+ }
+
+ private String readMetaSha(Result change) throws IOException {
+ try (Repository repo = repoManager.openRepository(change.getChange().change().getProject())) {
+ String refName = RefNames.changeMetaRef(change.getChange().getId());
+ Ref ref = repo.exactRef(refName);
+ if (ref == null) {
+ return null;
+ }
+
+ return ref.getTarget().getObjectId().getName();
+ }
+ }
+}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 875f203..578dc17 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -85,6 +85,7 @@
private Account.Id accountId;
private AccountGroup.UUID accountGroupUUID;
private ScheduledExecutorService executor = new CurrentThreadScheduledExecutorService();
+ private ScheduledExecutorService batchExecutor = new CurrentThreadScheduledExecutorService();
private ScheduledExecutorService testExecutor =
Executors.newScheduledThreadPool(MAX_TEST_PARALLELISM);
@Mock private RequestContext mockCtx;
@@ -142,6 +143,7 @@
indexEventHandler =
new IndexEventHandler(
executor,
+ batchExecutor,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
@@ -387,9 +389,11 @@
@Test
public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
+ ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
+ poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
@@ -405,9 +409,11 @@
@Test
public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
+ ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
+ poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,
@@ -422,9 +428,11 @@
@Test
public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
+ ScheduledThreadPoolExecutor poolBatchMock = mock(ScheduledThreadPoolExecutor.class);
indexEventHandler =
new IndexEventHandler(
poolMock,
+ poolBatchMock,
PLUGIN_NAME,
forwarder,
changeCheckerFactoryMock,