Merge "Implement LockManager extension using files to represent locks"
diff --git a/BUILD b/BUILD
index 9bb0445..3735cb3 100644
--- a/BUILD
+++ b/BUILD
@@ -20,10 +20,11 @@
],
resources = glob(["src/main/resources/**/*"]),
deps = [
- "@jgroups//jar",
- "@jgroups-kubernetes//jar",
- "@failsafe//jar",
- ":global-refdb-neverlink",
+ ":global-refdb-neverlink",
+ "@auto-value//jar",
+ "@failsafe//jar",
+ "@jgroups-kubernetes//jar",
+ "@jgroups//jar",
],
)
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
index 71bc08a..92834bd 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/indexsync/IndexSyncRunner.java
@@ -19,6 +19,9 @@
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import com.google.common.flogger.FluentLogger;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
@@ -27,9 +30,17 @@
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.index.query.Predicate;
+import com.google.gerrit.index.query.QueryParseException;
+import com.google.gerrit.index.query.QueryResult;
+import com.google.gerrit.server.change.ChangeFinder;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.change.ChangeIndexCollection;
import com.google.gerrit.server.index.change.ChangeIndexer;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.query.change.ChangeData;
+import com.google.gerrit.server.query.change.ChangeQueryBuilder;
+import com.google.gerrit.server.query.change.ChangeQueryProcessor;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@@ -39,6 +50,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -56,6 +68,9 @@
private final ChangeIndexer.Factory changeIndexerFactory;
private final ListeningExecutorService executor;
private final ChangeIndexCollection changeIndexes;
+ private final ChangeQueryBuilder queryBuilder;
+ private final Provider<ChangeQueryProcessor> queryProcessorProvider;
+ private final ChangeFinder changeFinder;
private final String age;
@AssistedInject
@@ -67,6 +82,9 @@
ChangeIndexer.Factory changeIndexerFactory,
@IndexExecutor(BATCH) ListeningExecutorService executor,
ChangeIndexCollection changeIndexes,
+ ChangeQueryBuilder queryBuilder,
+ Provider<ChangeQueryProcessor> queryProcessorProvider,
+ ChangeFinder changeFinder,
@Assisted String age) {
this.peerInfoProvider = peerInfoProvider;
this.httpClient = httpClient;
@@ -75,6 +93,9 @@
this.changeIndexerFactory = changeIndexerFactory;
this.executor = executor;
this.changeIndexes = changeIndexes;
+ this.queryBuilder = queryBuilder;
+ this.queryProcessorProvider = queryProcessorProvider;
+ this.changeFinder = changeFinder;
this.age = age;
}
@@ -124,6 +145,8 @@
return false;
}
+ syncChangeDeletions(ids, indexer);
+
return true;
}
@@ -141,4 +164,40 @@
log.atInfo().log("Scheduling async reindex of: %s", id);
return indexer.asyncReindexIfStale(projectName, Change.id(changeNumber));
}
+
+ private void syncChangeDeletions(List<String> theirChanges, ChangeIndexer indexer) {
+ Set<String> ourChanges = queryLocalIndex();
+ for (String d : Sets.difference(ourChanges, ImmutableSet.copyOf(theirChanges))) {
+ deleteIfMissingInNoteDb(d, indexer);
+ }
+ }
+
+ private Set<String> queryLocalIndex() {
+ ChangeQueryProcessor queryProcessor = queryProcessorProvider.get();
+ queryProcessor.enforceVisibility(false);
+ queryProcessor.setNoLimit(true);
+ Predicate<ChangeData> predicate = Predicate.not(queryBuilder.age(age));
+ QueryResult<ChangeData> result;
+ try {
+ result = queryProcessor.query(predicate);
+ } catch (QueryParseException e) {
+ throw new RuntimeException(e);
+ }
+
+ ImmutableList<ChangeData> cds = result.entities();
+ return cds.stream()
+ .map(cd -> cd.project().get() + "~" + cd.getId().get())
+ .collect(Collectors.toSet());
+ }
+
+ private void deleteIfMissingInNoteDb(String id, ChangeIndexer indexer) {
+ List<ChangeNotes> changeNotes = changeFinder.find(id);
+ if (changeNotes.isEmpty()) {
+ List<String> parts = Splitter.on("~").splitToList(id);
+ Project.NameKey project = Project.nameKey(parts.get(0));
+ Change.Id changeId = Change.id(Integer.parseInt(parts.get(1)));
+ log.atInfo().log("Change %s present in index but not in noteDb. Deleting from index", id);
+ indexer.deleteAsync(project, changeId);
+ }
+ }
}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
index 776a0a4..63f65d6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProvider.java
@@ -14,6 +14,7 @@
package com.ericsson.gerrit.plugins.highavailability.peers.jgroups;
+import autovalue.shaded.com.google.common.collect.ImmutableMap;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import com.google.common.annotations.VisibleForTesting;
@@ -24,8 +25,11 @@
import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
@@ -39,9 +43,8 @@
* each gerrit server publishes its url to all cluster members (publishes it to all channels).
*
* <p>This provider maintains a list of all members which joined the jgroups cluster. This may be
- * more than two. But will always pick the first node which sent its url as the peer to be returned
- * by {@link #get()}. It will continue to return that node until that node leaves the jgroups
- * cluster.
+ * more than two. The set of urls of all peers is returned by {@link #get()}. If a node leaves the
+ * jgroups cluster it's removed from this set.
*/
@Singleton
public class JGroupsPeerInfoProvider
@@ -53,8 +56,7 @@
private final String myUrl;
private JChannel channel;
- private Optional<PeerInfo> peerInfo = Optional.empty();
- private Address peerAddress;
+ private Map<Address, PeerInfo> peers = new ConcurrentHashMap<>();
@Inject
JGroupsPeerInfoProvider(
@@ -70,34 +72,28 @@
@Override
public void receive(Message msg) {
- synchronized (this) {
- if (peerAddress != null) {
- return;
- }
- peerAddress = msg.getSrc();
- String url = (String) msg.getObject();
- peerInfo = Optional.of(new PeerInfo(url));
- log.atInfo().log("receive(): Set new peerInfo: %s", url);
+ String url = (String) msg.getObject();
+ if (url == null) {
+ return;
+ }
+ Address addr = msg.getSrc();
+ PeerInfo old = peers.put(addr, new PeerInfo(url));
+ if (old == null) {
+ log.atInfo().log("receive(): Add new peerInfo: %s", url);
+ } else {
+ log.atInfo().log("receive(): Update peerInfo: from %s to %s", old.getDirectUrl(), url);
}
}
@Override
public void viewAccepted(View view) {
log.atInfo().log("viewAccepted(view: %s) called", view);
- synchronized (this) {
- if (view.getMembers().size() > 2) {
- log.atWarning().log(
- "%d members joined the jgroups cluster %s (%s). "
- + " Only two members are supported. Members: %s",
- view.getMembers().size(),
- jgroupsConfig.clusterName(),
- channel.getName(),
- view.getMembers());
- }
- if (peerAddress != null && !view.getMembers().contains(peerAddress)) {
- log.atInfo().log("viewAccepted(): removed peerInfo");
- peerAddress = null;
- peerInfo = Optional.empty();
+ Iterator<Map.Entry<Address, PeerInfo>> it = peers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<Address, PeerInfo> e = it.next();
+ if (!view.getMembers().contains(e.getKey())) {
+ log.atInfo().log("viewAccepted(): removed peerInfo %s", e.getValue().getDirectUrl());
+ it.remove();
}
}
if (view.size() > 1) {
@@ -144,14 +140,9 @@
this.channel = channel;
}
- @VisibleForTesting
- void setPeerInfo(Optional<PeerInfo> peerInfo) {
- this.peerInfo = peerInfo;
- }
-
@Override
public Set<PeerInfo> get() {
- return peerInfo.isPresent() ? ImmutableSet.of(peerInfo.get()) : ImmutableSet.of();
+ return ImmutableSet.copyOf(peers.values());
}
@Override
@@ -167,17 +158,19 @@
channel.getName(), jgroupsConfig.clusterName());
channel.close();
}
- peerInfo = Optional.empty();
- peerAddress = null;
+ peers.clear();
}
@VisibleForTesting
- Address getPeerAddress() {
- return peerAddress;
+ Map<Address, PeerInfo> getPeers() {
+ return ImmutableMap.copyOf(peers);
}
@VisibleForTesting
- void setPeerAddress(Address peerAddress) {
- this.peerAddress = peerAddress;
+ void addPeer(Address address, PeerInfo info) {
+ if (address == null) {
+ return;
+ }
+ this.peers.put(address, info);
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
index 83f86d0..fe6cfce 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/AbstractIndexForwardingIT.java
@@ -49,10 +49,8 @@
sysModule = "com.ericsson.gerrit.plugins.highavailability.Module",
httpModule = "com.ericsson.gerrit.plugins.highavailability.HttpModule")
public abstract class AbstractIndexForwardingIT extends LightweightPluginDaemonTest {
- private static final int PORT = 18889;
- private static final String URL = "http://localhost:" + PORT;
- @Rule public WireMockRule wireMockRule = new WireMockRule(options().port(PORT));
+ @Rule public WireMockRule wireMockRule = new WireMockRule(options().dynamicPort());
@Inject SitePaths sitePaths;
@@ -62,7 +60,7 @@
FileBasedConfig fileBasedConfig =
new FileBasedConfig(
sitePaths.etc_dir.resolve(Configuration.PLUGIN_CONFIG_FILE).toFile(), FS.DETECTED);
- fileBasedConfig.setString("peerInfo", "static", "url", URL);
+ fileBasedConfig.setString("peerInfo", "static", "url", url());
fileBasedConfig.setInt("http", null, "retryInterval", 100);
fileBasedConfig.save();
beforeAction();
@@ -88,6 +86,10 @@
verify(postRequestedFor(urlEqualTo(expectedRequest)));
}
+ private String url() {
+ return "http://localhost:" + wireMockRule.port();
+ }
+
/** Perform pre-test setup. */
protected abstract void beforeAction() throws Exception;
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
index fc8c29a..0c53959 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsKubernetesPeerInfoProviderTest.java
@@ -87,6 +87,8 @@
when(pluginConfigurationMock.jgroupsKubernetes().namespace()).thenReturn(namespace);
when(pluginConfigurationMock.jgroupsKubernetes().labels()).thenReturn(labels);
+ when(myUrlProvider.get()).thenReturn("http://127.0.0.1:7800");
+
NetworkInterface eth0 = NetworkInterface.getByName("eth0");
if (eth0 != null) {
when(finder.findAddress()).thenReturn(eth0.inetAddresses().findFirst());
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
index fd203e0..a098be6 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/peers/jgroups/JGroupsPeerInfoProviderTest.java
@@ -16,12 +16,12 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
import com.ericsson.gerrit.plugins.highavailability.Configuration;
import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import org.jgroups.Address;
import org.jgroups.JChannel;
@@ -43,7 +43,7 @@
private InetAddressFinder finder;
private JGroupsPeerInfoProvider jGroupsPeerInfoProvider;
- private Optional<PeerInfo> peerInfo;
+ private PeerInfo peerInfo;
@Mock private JChannel channel;
@Mock private MyUrlProvider myUrlProviderTest;
@Mock private Message message;
@@ -57,7 +57,7 @@
JChannel channel = new JChannelProvider(pluginConfigurationMock).get();
jGroupsPeerInfoProvider =
new JGroupsPeerInfoProvider(pluginConfigurationMock, finder, myUrlProviderTest, channel);
- peerInfo = Optional.of(new PeerInfo("test message"));
+ peerInfo = new PeerInfo("test message");
channel.setName("testChannel");
}
@@ -68,7 +68,7 @@
jGroupsPeerInfoProvider.receive(message);
- assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(peerAddress);
+ assertThat(jGroupsPeerInfoProvider.getPeers()).containsKey(peerAddress);
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
for (PeerInfo testPeerInfo : testPeerInfoSet) {
assertThat(testPeerInfo.getDirectUrl()).contains("test message");
@@ -78,20 +78,52 @@
@Test
public void testReceiveWhenPeerAddressIsNotNull() throws Exception {
- jGroupsPeerInfoProvider.setPeerAddress(new IpAddress("checkAddress.com"));
+ lenient().when(message.getSrc()).thenReturn(peerAddress);
+ when(message.getObject()).thenReturn(null);
jGroupsPeerInfoProvider.receive(message);
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
- assertThat(testPeerInfoSet.isEmpty()).isTrue();
- assertThat(testPeerInfoSet.size()).isEqualTo(0);
+ assertThat(testPeerInfoSet).isEmpty();
+ }
+
+ @Test
+ public void testReceiveMultiplePeers() throws Exception {
+ IpAddress addr1 = new IpAddress("192.168.1.5:7800");
+ IpAddress addr2 = new IpAddress("192.168.1.6:7800");
+ IpAddress addr3 = new IpAddress("192.168.1.7:7800");
+ PeerInfo peer1 = new PeerInfo("URL1");
+ PeerInfo peer2 = new PeerInfo("URL2");
+ PeerInfo peer3 = new PeerInfo("URL3");
+
+ receive(addr1, peer1);
+ receive(addr2, peer2);
+ receive(addr3, peer3);
+
+ Set<PeerInfo> peers = jGroupsPeerInfoProvider.get();
+ assertThat(peers.size()).isEqualTo(3);
+ assertThat(peers).containsExactly(peer1, peer2, peer3);
+
+ // remove one peer with address ADDR1 from the view
+ List<Address> reducedView = List.of(addr2, addr3);
+ when(view.getMembers()).thenReturn(reducedView);
+ when(view.size()).thenReturn(2);
+ jGroupsPeerInfoProvider.setChannel(channel);
+ jGroupsPeerInfoProvider.viewAccepted(view);
+ peers = jGroupsPeerInfoProvider.get();
+ assertThat(peers.size()).isEqualTo(2);
+ assertThat(peers).containsExactly(peer2, peer3);
+ }
+
+ public void receive(final IpAddress addr, final PeerInfo peer) {
+ when(message.getSrc()).thenReturn(addr);
+ when(message.getObject()).thenReturn(peer.getDirectUrl());
+ jGroupsPeerInfoProvider.receive(message);
}
@Test(expected = None.class)
public void testViewAcceptedWithNoExceptionThrown() throws Exception {
- when(view.getMembers()).thenReturn(members);
when(view.size()).thenReturn(3);
- when(members.size()).thenReturn(3);
jGroupsPeerInfoProvider.setChannel(channel);
jGroupsPeerInfoProvider.viewAccepted(view);
}
@@ -100,37 +132,32 @@
public void testViewAcceptedWhenPeerAddressIsNotNullAndIsNotMemberOfView() {
when(view.getMembers()).thenReturn(members);
when(view.size()).thenReturn(2);
- when(members.size()).thenReturn(2);
when(members.contains(peerAddress)).thenReturn(false);
- jGroupsPeerInfoProvider.setPeerAddress(peerAddress);
- jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+ jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
jGroupsPeerInfoProvider.setChannel(channel);
jGroupsPeerInfoProvider.viewAccepted(view);
- assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(null);
+ assertThat(jGroupsPeerInfoProvider.getPeers()).isEmpty();
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
- assertThat(testPeerInfoSet.isEmpty()).isTrue();
- assertThat(testPeerInfoSet.size()).isEqualTo(0);
+ assertThat(testPeerInfoSet).isEmpty();
}
@Test
- public void testConnect() throws NoSuchFieldException, IllegalAccessException {
+ public void testConnect() {
jGroupsPeerInfoProvider.connect();
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
- assertThat(testPeerInfoSet.isEmpty()).isTrue();
- assertThat(testPeerInfoSet.size()).isEqualTo(0);
+ assertThat(testPeerInfoSet).isEmpty();
}
@Test
public void testGetWhenPeerInfoIsOptionalEmpty() {
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
- assertThat(testPeerInfoSet.isEmpty()).isTrue();
- assertThat(testPeerInfoSet.size()).isEqualTo(0);
+ assertThat(testPeerInfoSet).isEmpty();
}
@Test
public void testGetWhenPeerInfoIsPresent() {
- jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+ jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
for (PeerInfo testPeerInfo : testPeerInfoSet) {
assertThat(testPeerInfo.getDirectUrl()).contains("test message");
@@ -140,12 +167,10 @@
@Test
public void testStop() throws Exception {
- jGroupsPeerInfoProvider.setPeerAddress(peerAddress);
- jGroupsPeerInfoProvider.setPeerInfo(peerInfo);
+ jGroupsPeerInfoProvider.addPeer(peerAddress, peerInfo);
jGroupsPeerInfoProvider.stop();
- assertThat(jGroupsPeerInfoProvider.getPeerAddress()).isEqualTo(null);
+ assertThat(jGroupsPeerInfoProvider.getPeers().isEmpty());
Set<PeerInfo> testPeerInfoSet = jGroupsPeerInfoProvider.get();
- assertThat(testPeerInfoSet.isEmpty()).isTrue();
- assertThat(testPeerInfoSet.size()).isEqualTo(0);
+ assertThat(testPeerInfoSet).isEmpty();
}
}