Merge "Also sync change deletions when indexSync is enabled"
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
index 71bc08a..92834bd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
@@ -19,6 +19,9 @@
 import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Futures;
@@ -27,9 +30,17 @@
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.index.query.Predicate;
+import com.google.gerrit.index.query.QueryParseException;
+import com.google.gerrit.index.query.QueryResult;
+import com.google.gerrit.server.change.ChangeFinder;
 import com.google.gerrit.server.index.IndexExecutor;
 import com.google.gerrit.server.index.change.ChangeIndexCollection;
 import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.query.change.ChangeData;
+import com.google.gerrit.server.query.change.ChangeQueryBuilder;
+import com.google.gerrit.server.query.change.ChangeQueryProcessor;
 import com.google.inject.Provider;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
@@ -39,6 +50,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 
@@ -56,6 +68,9 @@
   private final ChangeIndexer.Factory changeIndexerFactory;
   private final ListeningExecutorService executor;
   private final ChangeIndexCollection changeIndexes;
+  private final ChangeQueryBuilder queryBuilder;
+  private final Provider<ChangeQueryProcessor> queryProcessorProvider;
+  private final ChangeFinder changeFinder;
   private final String age;
 
   @AssistedInject
@@ -67,6 +82,9 @@
       ChangeIndexer.Factory changeIndexerFactory,
       @IndexExecutor(BATCH) ListeningExecutorService executor,
       ChangeIndexCollection changeIndexes,
+      ChangeQueryBuilder queryBuilder,
+      Provider<ChangeQueryProcessor> queryProcessorProvider,
+      ChangeFinder changeFinder,
       @Assisted String age) {
     this.peerInfoProvider = peerInfoProvider;
     this.httpClient = httpClient;
@@ -75,6 +93,9 @@
     this.changeIndexerFactory = changeIndexerFactory;
     this.executor = executor;
     this.changeIndexes = changeIndexes;
+    this.queryBuilder = queryBuilder;
+    this.queryProcessorProvider = queryProcessorProvider;
+    this.changeFinder = changeFinder;
     this.age = age;
   }
 
@@ -124,6 +145,8 @@
       return false;
     }
 
+    syncChangeDeletions(ids, indexer);
+
     return true;
   }
 
@@ -141,4 +164,40 @@
     log.atInfo().log("Scheduling async reindex of: %s", id);
     return indexer.asyncReindexIfStale(projectName, Change.id(changeNumber));
   }
+
+  private void syncChangeDeletions(List<String> theirChanges, ChangeIndexer indexer) {
+    Set<String> ourChanges = queryLocalIndex();
+    for (String d : Sets.difference(ourChanges, ImmutableSet.copyOf(theirChanges))) {
+      deleteIfMissingInNoteDb(d, indexer);
+    }
+  }
+
+  private Set<String> queryLocalIndex() {
+    ChangeQueryProcessor queryProcessor = queryProcessorProvider.get();
+    queryProcessor.enforceVisibility(false);
+    queryProcessor.setNoLimit(true);
+    Predicate<ChangeData> predicate = Predicate.not(queryBuilder.age(age));
+    QueryResult<ChangeData> result;
+    try {
+      result = queryProcessor.query(predicate);
+    } catch (QueryParseException e) {
+      throw new RuntimeException(e);
+    }
+
+    ImmutableList<ChangeData> cds = result.entities();
+    return cds.stream()
+        .map(cd -> cd.project().get() + "~" + cd.getId().get())
+        .collect(Collectors.toSet());
+  }
+
+  private void deleteIfMissingInNoteDb(String id, ChangeIndexer indexer) {
+    List<ChangeNotes> changeNotes = changeFinder.find(id);
+    if (changeNotes.isEmpty()) {
+      List<String> parts = Splitter.on("~").splitToList(id);
+      Project.NameKey project = Project.nameKey(parts.get(0));
+      Change.Id changeId = Change.id(Integer.parseInt(parts.get(1)));
+      log.atInfo().log("Change %s present in index but not in noteDb. Deleting from index", id);
+      indexer.deleteAsync(project, changeId);
+    }
+  }
 }