Merge "Also sync change deletions when indexSync is enabled"
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
index 71bc08a..92834bd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
@@ -19,6 +19,9 @@
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
@@ -27,9 +30,17 @@
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.index.query.Predicate;
+import com.google.gerrit.index.query.QueryParseException;
+import com.google.gerrit.index.query.QueryResult;
+import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.change.ChangeIndexCollection;
import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.query.change.ChangeData;
+import com.google.gerrit.server.query.change.ChangeQueryBuilder;
+import com.google.gerrit.server.query.change.ChangeQueryProcessor;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@@ -39,6 +50,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -56,6 +68,9 @@
private final ChangeIndexer.Factory changeIndexerFactory;
private final ListeningExecutorService executor;
private final ChangeIndexCollection changeIndexes;
+ private final ChangeQueryBuilder queryBuilder;
+ private final Provider<ChangeQueryProcessor> queryProcessorProvider;
+ private final ChangeFinder changeFinder;
private final String age;
@AssistedInject
@@ -67,6 +82,9 @@
ChangeIndexer.Factory changeIndexerFactory,
@IndexExecutor(BATCH) ListeningExecutorService executor,
ChangeIndexCollection changeIndexes,
+ ChangeQueryBuilder queryBuilder,
+ Provider<ChangeQueryProcessor> queryProcessorProvider,
+ ChangeFinder changeFinder,
@Assisted String age) {
this.peerInfoProvider = peerInfoProvider;
this.httpClient = httpClient;
@@ -75,6 +93,9 @@
this.changeIndexerFactory = changeIndexerFactory;
this.executor = executor;
this.changeIndexes = changeIndexes;
+ this.queryBuilder = queryBuilder;
+ this.queryProcessorProvider = queryProcessorProvider;
+ this.changeFinder = changeFinder;
this.age = age;
}
@@ -124,6 +145,8 @@
return false;
}
+ syncChangeDeletions(ids, indexer);
+
return true;
}
@@ -141,4 +164,40 @@
log.atInfo().log("Scheduling async reindex of: %s", id);
return indexer.asyncReindexIfStale(projectName, Change.id(changeNumber));
}
+
+ private void syncChangeDeletions(List<String> theirChanges, ChangeIndexer indexer) {
+ Set<String> ourChanges = queryLocalIndex();
+ for (String d : Sets.difference(ourChanges, ImmutableSet.copyOf(theirChanges))) {
+ deleteIfMissingInNoteDb(d, indexer);
+ }
+ }
+
+ private Set<String> queryLocalIndex() {
+ ChangeQueryProcessor queryProcessor = queryProcessorProvider.get();
+ queryProcessor.enforceVisibility(false);
+ queryProcessor.setNoLimit(true);
+ Predicate<ChangeData> predicate = Predicate.not(queryBuilder.age(age));
+ QueryResult<ChangeData> result;
+ try {
+ result = queryProcessor.query(predicate);
+ } catch (QueryParseException e) {
+ throw new RuntimeException(e);
+ }
+
+ ImmutableList<ChangeData> cds = result.entities();
+ return cds.stream()
+ .map(cd -> cd.project().get() + "~" + cd.getId().get())
+ .collect(Collectors.toSet());
+ }
+
+ private void deleteIfMissingInNoteDb(String id, ChangeIndexer indexer) {
+ List<ChangeNotes> changeNotes = changeFinder.find(id);
+ if (changeNotes.isEmpty()) {
+ List<String> parts = Splitter.on("~").splitToList(id);
+ Project.NameKey project = Project.nameKey(parts.get(0));
+ Change.Id changeId = Change.id(Integer.parseInt(parts.get(1)));
+ log.atInfo().log("Change %s present in index but not in noteDb. Deleting from index", id);
+ indexer.deleteAsync(project, changeId);
+ }
+ }
}