Merge "Implement LockManager extension using files to represent locks"
diff --git a/BUILD b/BUILD
index 9bb0445..3735cb3 100644
--- a/BUILD
+++ b/BUILD
@@ -20,10 +20,11 @@
     ],
     resources = glob(["src/main/resources/**/*"]),
     deps = [
-      "@jgroups//jar",
-      "@jgroups-kubernetes//jar",
-      "@failsafe//jar",
-      ":global-refdb-neverlink",
+        ":global-refdb-neverlink",
+        "@auto-value//jar",
+        "@failsafe//jar",
+        "@jgroups-kubernetes//jar",
+        "@jgroups//jar",
     ],
 )
 
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
index 71bc08a..92834bd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
@@ -19,6 +19,9 @@
 import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import com.google.common.flogger.FluentLogger;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Futures;
@@ -27,9 +30,17 @@
 import com.google.gerrit.entities.Change;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.index.query.Predicate;
+import com.google.gerrit.index.query.QueryParseException;
+import com.google.gerrit.index.query.QueryResult;
+import com.google.gerrit.server.change.ChangeFinder;
 import com.google.gerrit.server.index.IndexExecutor;
 import com.google.gerrit.server.index.change.ChangeIndexCollection;
 import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.query.change.ChangeData;
+import com.google.gerrit.server.query.change.ChangeQueryBuilder;
+import com.google.gerrit.server.query.change.ChangeQueryProcessor;
 import com.google.inject.Provider;
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
@@ -39,6 +50,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 
@@ -56,6 +68,9 @@
   private final ChangeIndexer.Factory changeIndexerFactory;
   private final ListeningExecutorService executor;
   private final ChangeIndexCollection changeIndexes;
+  private final ChangeQueryBuilder queryBuilder;
+  private final Provider<ChangeQueryProcessor> queryProcessorProvider;
+  private final ChangeFinder changeFinder;
   private final String age;
 
   @AssistedInject
@@ -67,6 +82,9 @@
       ChangeIndexer.Factory changeIndexerFactory,
       @IndexExecutor(BATCH) ListeningExecutorService executor,
       ChangeIndexCollection changeIndexes,
+      ChangeQueryBuilder queryBuilder,
+      Provider<ChangeQueryProcessor> queryProcessorProvider,
+      ChangeFinder changeFinder,
       @Assisted String age) {
     this.peerInfoProvider = peerInfoProvider;
     this.httpClient = httpClient;
@@ -75,6 +93,9 @@
     this.changeIndexerFactory = changeIndexerFactory;
     this.executor = executor;
     this.changeIndexes = changeIndexes;
+    this.queryBuilder = queryBuilder;
+    this.queryProcessorProvider = queryProcessorProvider;
+    this.changeFinder = changeFinder;
     this.age = age;
   }
 
@@ -124,6 +145,8 @@
       return false;
     }
 
+    syncChangeDeletions(ids, indexer);
+
     return true;
   }
 
@@ -141,4 +164,40 @@
     log.atInfo().log("Scheduling async reindex of: %s", id);
     return indexer.asyncReindexIfStale(projectName, Change.id(changeNumber));
   }
+
+  private void syncChangeDeletions(List<String> theirChanges, ChangeIndexer indexer) {
+    Set<String> ourChanges = queryLocalIndex();
+    for (String d : Sets.difference(ourChanges, ImmutableSet.copyOf(theirChanges))) {
+      deleteIfMissingInNoteDb(d, indexer);
+    }
+  }
+
+  private Set<String> queryLocalIndex() {
+    ChangeQueryProcessor queryProcessor = queryProcessorProvider.get();
+    queryProcessor.enforceVisibility(false);
+    queryProcessor.setNoLimit(true);
+    Predicate<ChangeData> predicate = Predicate.not(queryBuilder.age(age));
+    QueryResult<ChangeData> result;
+    try {
+      result = queryProcessor.query(predicate);
+    } catch (QueryParseException e) {
+      throw new RuntimeException(e);
+    }
+
+    ImmutableList<ChangeData> cds = result.entities();
+    return cds.stream()
+        .map(cd -> cd.project().get() + "~" + cd.getId().get())
+        .collect(Collectors.toSet());
+  }
+
+  private void deleteIfMissingInNoteDb(String id, ChangeIndexer indexer) {
+    List<ChangeNotes> changeNotes = changeFinder.find(id);
+    if (changeNotes.isEmpty()) {
+      List<String> parts = Splitter.on("~").splitToList(id);
+      Project.NameKey project = Project.nameKey(parts.get(0));
+      Change.Id changeId = Change.id(Integer.parseInt(parts.get(1)));
+      log.atInfo().log("Change %s present in index but not in noteDb. Deleting from index", id);
+      indexer.deleteAsync(project, changeId);
+    }
+  }
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
index 776a0a4..63f65d6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
@@ -14,6 +14,7 @@
 
 package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
 
+import autovalue.shaded.com.google.common.collect.ImmutableMap;
 import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
 import com.google.common.annotations.VisibleForTesting;
@@ -24,8 +25,11 @@
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
 import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.jgroups.Address;
 import org.jgroups.JChannel;
 import org.jgroups.Message;
@@ -39,9 +43,8 @@
  * each gerrit server publishes its url to all cluster members (publishes it to all channels).
  *
  * <p>This provider maintains a list of all members which joined the jgroups cluster. This may be
- * more than two. But will always pick the first node which sent its url as the peer to be returned
- * by {@link #get()}. It will continue to return that node until that node leaves the jgroups
- * cluster.
+ * more than two. The set of urls of all peers is returned by {@link #get()}. If a node leaves the
+ * jgroups cluster it's removed from this set.
  */
 @Singleton
 public class JGroupsPeerInfoProvider
@@ -53,8 +56,7 @@
   private final String myUrl;
 
   private JChannel channel;
-  private Optional<PeerInfo> peerInfo = Optional.empty();
-  private Address peerAddress;
+  private Map<Address, PeerInfo> peers = new ConcurrentHashMap<>();
 
   @Inject
   JGroupsPeerInfoProvider(
@@ -70,34 +72,28 @@
 
   @Override
   public void receive(Message msg) {
-    synchronized (this) {
-      if (peerAddress != null) {
-        return;
-      }
-      peerAddress = msg.getSrc();
-      String url = (String) msg.getObject();
-      peerInfo = Optional.of(new PeerInfo(url));
-      log.atInfo().log("receive(): Set new peerInfo: %s", url);
+    String url = (String) msg.getObject();
+    if (url == null) {
+      return;
+    }
+    Address addr = msg.getSrc();
+    PeerInfo old = peers.put(addr, new PeerInfo(url));
+    if (old == null) {
+      log.atInfo().log("receive(): Add new peerInfo: %s", url);
+    } else {
+      log.atInfo().log("receive(): Update peerInfo: from %s to %s", old.getDirectUrl(), url);
     }
   }
 
   @Override
   public void viewAccepted(View view) {
     log.atInfo().log("viewAccepted(view: %s) called", view);
-    synchronized (this) {
-      if (view.getMembers().size() > 2) {
-        log.atWarning().log(
-            "%d members joined the jgroups cluster %s (%s). "
-                + " Only two members are supported. Members: %s",
-            view.getMembers().size(),
-            jgroupsConfig.clusterName(),
-            channel.getName(),
-            view.getMembers());
-      }
-      if (peerAddress != null && !view.getMembers().contains(peerAddress)) {
-        log.atInfo().log("viewAccepted(): removed peerInfo");
-        peerAddress = null;
-        peerInfo = Optional.empty();
+    Iterator<Map.Entry<Address, PeerInfo>> it = peers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<Address, PeerInfo> e = it.next();
+      if (!view.getMembers().contains(e.getKey())) {
+        log.atInfo().log("viewAccepted(): removed peerInfo %s", e.getValue().getDirectUrl());
+        it.remove();
       }
     }
     if (view.size() > 1) {
@@ -144,14 +140,9 @@
     this.channel = channel;
   }
 
-  @VisibleForTesting
-  void setPeerInfo(Optional<PeerInfo> peerInfo) {
-    this.peerInfo = peerInfo;
-  }
-
   @Override
   public Set<PeerInfo> get() {
-    return peerInfo.isPresent() ? ImmutableSet.of(peerInfo.get()) : ImmutableSet.of();
+    return ImmutableSet.copyOf(peers.values());
   }
 
   @Override
@@ -167,17 +158,19 @@
           channel.getName(), jgroupsConfig.clusterName());
       channel.close();
     }
-    peerInfo = Optional.empty();
-    peerAddress = null;
+    peers.clear();
   }
 
   @VisibleForTesting
-  Address getPeerAddress() {
-    return peerAddress;
+  Map<Address, PeerInfo> getPeers() {
+    return ImmutableMap.copyOf(peers);
   }
 
   @VisibleForTesting
-  void setPeerAddress(Address peerAddress) {
-    this.peerAddress = peerAddress;
+  void addPeer(Address address, PeerInfo info) {
+    if (address == null) {
+      return;
+    }
+    this.peers.put(address, info);
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
index 83f86d0..fe6cfce 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
@@ -49,10 +49,8 @@
     sysModule = "com.ericsson.gerrit.plugins.highavailability.Module",
     httpModule = "com.ericsson.gerrit.plugins.highavailability.HttpModule")
 public abstract class AbstractIndexForwardingIT extends LightweightPluginDaemonTest {
-  private static final int PORT = 18889;
-  private static final String URL = "http://localhost:" + PORT;
 
-  @Rule public WireMockRule wireMockRule = new WireMockRule(options().port(PORT));
+  @Rule public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
 
   @Inject SitePaths sitePaths;
 
@@ -62,7 +60,7 @@
     FileBasedConfig fileBasedConfig =
         new FileBasedConfig(
             sitePaths.etc_dir.resolve(Configuration.PLUGIN_CONFIG_FILE).toFile(), FS.DETECTED);
-    fileBasedConfig.setString("peerInfo", "static", "url", URL);
+    fileBasedConfig.setString("peerInfo", "static", "url", url());
     fileBasedConfig.setInt("http", null, "retryInterval", 100);
     fileBasedConfig.save();
     beforeAction();
@@ -88,6 +86,10 @@
     verify(postRequestedFor(urlEqualTo(expectedRequest)));
   }
 
+  private String url() {
+    return "http://localhost:" + wireMockRule.port();
+  }
+
   /** Perform pre-test setup. */
   protected abstract void beforeAction() throws Exception;
 
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
index fc8c29a..0c53959 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
@@ -87,6 +87,8 @@
     when(pluginConfigurationMock.jgroupsKubernetes().namespace()).thenReturn(namespace);
     when(pluginConfigurationMock.jgroupsKubernetes().labels()).thenReturn(labels);
 
+    when(myUrlProvider.get()).thenReturn("http://127.0.0.1:7800");
+
     NetworkInterface eth0 = NetworkInterface.getByName("eth0");
     if (eth0 != null) {
       when(finder.findAddress()).thenReturn(eth0.inetAddresses().findFirst());
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
index fd203e0..a098be6 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
@@ -16,12 +16,12 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
 import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import org.jgroups.Address;
 import org.jgroups.JChannel;
@@ -43,7 +43,7 @@
 
   private InetAddressFinder finder;
   private JGroupsPeerInfoProvider jGroupsPeerInfoProvider;
-  private Optional<PeerInfo> peerInfo;
+  private PeerInfo peerInfo;
   @Mock private JChannel channel;
   @Mock private MyUrlProvider myUrlProviderTest;
   @Mock private Message message;
@@ -57,7 +57,7 @@
     JChannel channel = new JChannelProvider(pluginConfigurationMock).get();
     jGroupsPeerInfoProvider =
         new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest, channel);
-    peerInfo = Optional.of(new PeerInfo("test message"));
+    peerInfo = new PeerInfo("test message");
     channel.setName("testChannel");
   }
 
@@ -68,7 +68,7 @@
 
     jGroupsPeerInfoProvider.receive(message);
 
-    assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(peerAddress);
+    assertThat(jGroupsPeerInfoProvider.getPeers()).containsKey(peerAddress);
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
     for (PeerInfo testPeerInfo : testPeerInfoSet) {
       assertThat(testPeerInfo.getDirectUrl()).contains("test message");
@@ -78,20 +78,52 @@
 
   @Test
   public void testReceiveWhenPeerAddressIsNotNull() throws Exception {
-    jGroupsPeerInfoProvider.setPeerAddress(new IpAddress("checkAddress.com"));
+    lenient().when(message.getSrc()).thenReturn(peerAddress);
+    when(message.getObject()).thenReturn(null);
 
     jGroupsPeerInfoProvider.receive(message);
 
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
-    assertThat(testPeerInfoSet.isEmpty()).isTrue();
-    assertThat(testPeerInfoSet.size()).isEqualTo(0);
+    assertThat(testPeerInfoSet).isEmpty();
+  }
+
+  @Test
+  public void testReceiveMultiplePeers() throws Exception {
+    IpAddress addr1 = new IpAddress("192.168.1.5:7800");
+    IpAddress addr2 = new IpAddress("192.168.1.6:7800");
+    IpAddress addr3 = new IpAddress("192.168.1.7:7800");
+    PeerInfo peer1 = new PeerInfo("URL1");
+    PeerInfo peer2 = new PeerInfo("URL2");
+    PeerInfo peer3 = new PeerInfo("URL3");
+
+    receive(addr1, peer1);
+    receive(addr2, peer2);
+    receive(addr3, peer3);
+
+    Set<PeerInfo> peers = jGroupsPeerInfoProvider.get();
+    assertThat(peers.size()).isEqualTo(3);
+    assertThat(peers).containsExactly(peer1, peer2, peer3);
+
+    // remove one peer with address ADDR1 from the view
+    List<Address> reducedView = List.of(addr2, addr3);
+    when(view.getMembers()).thenReturn(reducedView);
+    when(view.size()).thenReturn(2);
+    jGroupsPeerInfoProvider.setChannel(channel);
+    jGroupsPeerInfoProvider.viewAccepted(view);
+    peers = jGroupsPeerInfoProvider.get();
+    assertThat(peers.size()).isEqualTo(2);
+    assertThat(peers).containsExactly(peer2, peer3);
+  }
+
+  public void receive(final IpAddress addr, final PeerInfo peer) {
+    when(message.getSrc()).thenReturn(addr);
+    when(message.getObject()).thenReturn(peer.getDirectUrl());
+    jGroupsPeerInfoProvider.receive(message);
   }
 
   @Test(expected = None.class)
   public void testViewAcceptedWithNoExceptionThrown() throws Exception {
-    when(view.getMembers()).thenReturn(members);
     when(view.size()).thenReturn(3);
-    when(members.size()).thenReturn(3);
     jGroupsPeerInfoProvider.setChannel(channel);
     jGroupsPeerInfoProvider.viewAccepted(view);
   }
@@ -100,37 +132,32 @@
   public void testViewAcceptedWhenPeerAddressIsNotNullAndIsNotMemberOfView() {
     when(view.getMembers()).thenReturn(members);
     when(view.size()).thenReturn(2);
-    when(members.size()).thenReturn(2);
     when(members.contains(peerAddress)).thenReturn(false);
-    jGroupsPeerInfoProvider.setPeerAddress(peerAddress);
-    jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+    jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
     jGroupsPeerInfoProvider.setChannel(channel);
     jGroupsPeerInfoProvider.viewAccepted(view);
 
-    assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(null);
+    assertThat(jGroupsPeerInfoProvider.getPeers()).isEmpty();
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
-    assertThat(testPeerInfoSet.isEmpty()).isTrue();
-    assertThat(testPeerInfoSet.size()).isEqualTo(0);
+    assertThat(testPeerInfoSet).isEmpty();
   }
 
   @Test
-  public void testConnect() throws NoSuchFieldException, IllegalAccessException {
+  public void testConnect() {
     jGroupsPeerInfoProvider.connect();
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
-    assertThat(testPeerInfoSet.isEmpty()).isTrue();
-    assertThat(testPeerInfoSet.size()).isEqualTo(0);
+    assertThat(testPeerInfoSet).isEmpty();
   }
 
   @Test
   public void testGetWhenPeerInfoIsOptionalEmpty() {
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
-    assertThat(testPeerInfoSet.isEmpty()).isTrue();
-    assertThat(testPeerInfoSet.size()).isEqualTo(0);
+    assertThat(testPeerInfoSet).isEmpty();
   }
 
   @Test
   public void testGetWhenPeerInfoIsPresent() {
-    jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+    jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
     for (PeerInfo testPeerInfo : testPeerInfoSet) {
       assertThat(testPeerInfo.getDirectUrl()).contains("test message");
@@ -140,12 +167,10 @@
 
   @Test
   public void testStop() throws Exception {
-    jGroupsPeerInfoProvider.setPeerAddress(peerAddress);
-    jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+    jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
     jGroupsPeerInfoProvider.stop();
-    assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(null);
+    assertThat(jGroupsPeerInfoProvider.getPeers().isEmpty());
     Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
-    assertThat(testPeerInfoSet.isEmpty()).isTrue();
-    assertThat(testPeerInfoSet.size()).isEqualTo(0);
+    assertThat(testPeerInfoSet).isEmpty();
   }
 }