Merge branch 'stable-3.2'

* stable-3.2:
  Exclude special refs from fetch calls
  Parallelise REST API fetch calls

Change-Id: I5e57945e7c4e793c9d1e2cc891c023093d8b3dac
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java
new file mode 100644
index 0000000..0b9a691
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/RefsFilter.java
@@ -0,0 +1,102 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.entities.AccessSection;
+import com.google.gerrit.entities.RefNames;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import java.util.List;
+import org.eclipse.jgit.lib.Config;
+
+@Singleton
+public class RefsFilter {
+  public enum PatternType {
+    REGEX,
+    WILDCARD,
+    EXACT_MATCH;
+
+    public static PatternType getPatternType(String pattern) {
+      if (pattern.startsWith(AccessSection.REGEX_PREFIX)) {
+        return REGEX;
+      } else if (pattern.endsWith("*")) {
+        return WILDCARD;
+      } else {
+        return EXACT_MATCH;
+      }
+    }
+  }
+
+  private final List<String> refsPatterns;
+
+  @Inject
+  public RefsFilter(ReplicationConfig replicationConfig) {
+    refsPatterns = getRefNamePatterns(replicationConfig.getConfig());
+  }
+
+  public boolean match(String refName) {
+    if (refName == null || Strings.isNullOrEmpty(refName)) {
+      throw new IllegalArgumentException(
+          String.format("Ref name cannot be null or empty, but was %s", refName));
+    }
+    if (refsPatterns.isEmpty()) {
+      return true;
+    }
+
+    for (String pattern : refsPatterns) {
+      if (matchesPattern(refName, pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private List<String> getRefNamePatterns(Config cfg) {
+    return ImmutableList.<String>builder()
+        .addAll(getDefaultExcludeRefPatterns())
+        .addAll(ImmutableList.copyOf(cfg.getStringList("replication", null, "excludeRefs")))
+        .build();
+  }
+
+  private boolean matchesPattern(String refName, String pattern) {
+    boolean match = false;
+    switch (PatternType.getPatternType(pattern)) {
+      case REGEX:
+        match = refName.matches(pattern);
+        break;
+      case WILDCARD:
+        match = refName.startsWith(pattern.substring(0, pattern.length() - 1));
+        break;
+      case EXACT_MATCH:
+        match = refName.equals(pattern);
+    }
+    return match;
+  }
+
+  private List<String> getDefaultExcludeRefPatterns() {
+    return ImmutableList.of(
+        RefNames.REFS_USERS + "*",
+        RefNames.REFS_CONFIG,
+        RefNames.REFS_SEQUENCES + "*",
+        RefNames.REFS_EXTERNAL_IDS,
+        RefNames.REFS_GROUPS + "*",
+        RefNames.REFS_GROUPNAMES,
+        RefNames.REFS_CACHE_AUTOMERGE + "*",
+        RefNames.REFS_STARRED_CHANGES + "*");
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
index 2158a59..fc5ed7d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueue.java
@@ -32,6 +32,10 @@
 import java.util.HashSet;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
@@ -43,6 +47,7 @@
   static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
   static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
 
+  private static final Integer DEFAULT_FETCH_CALLS_TIMEOUT = 0;
   private final ReplicationStateListener stateLog;
 
   private final WorkQueue workQueue;
@@ -52,6 +57,8 @@
   private volatile boolean replaying;
   private final Queue<ReferenceUpdatedEvent> beforeStartupEventsQueue;
   private FetchRestApiClient.Factory fetchClientFactory;
+  private Integer fetchCallsTimeout;
+  private RefsFilter refsFilter;
 
   @Inject
   ReplicationQueue(
@@ -59,19 +66,28 @@
       Provider<SourcesCollection> rd,
       DynamicItem<EventDispatcher> dis,
       ReplicationStateListeners sl,
-      FetchRestApiClient.Factory fetchClientFactory) {
+      FetchRestApiClient.Factory fetchClientFactory,
+      RefsFilter refsFilter) {
     workQueue = wq;
     dispatcher = dis;
     sources = rd;
     stateLog = sl;
     beforeStartupEventsQueue = Queues.newConcurrentLinkedQueue();
     this.fetchClientFactory = fetchClientFactory;
+    this.refsFilter = refsFilter;
   }
 
   @Override
   public void start() {
     if (!running) {
       sources.get().startup(workQueue);
+      fetchCallsTimeout =
+          2
+              * sources.get().getAll().stream()
+                  .mapToInt(Source::getConnectionTimeout)
+                  .max()
+                  .orElse(DEFAULT_FETCH_CALLS_TIMEOUT);
+
       running = true;
       fireBeforeStartupEvents();
     }
@@ -98,7 +114,13 @@
 
   @Override
   public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
-    fire(event.getProjectName(), ObjectId.fromString(event.getNewObjectId()), event.getRefName());
+    if (isRefToBeReplicated(event.getRefName())) {
+      fire(event.getProjectName(), ObjectId.fromString(event.getNewObjectId()), event.getRefName());
+    }
+  }
+
+  private Boolean isRefToBeReplicated(String refName) {
+    return !refsFilter.match(refName);
   }
 
   private void fire(String projectName, ObjectId objectId, String refName) {
@@ -116,33 +138,61 @@
       beforeStartupEventsQueue.add(ReferenceUpdatedEvent.create(project.get(), refName, objectId));
       return;
     }
+    ForkJoinPool fetchCallsPool = null;
+    try {
+      fetchCallsPool = new ForkJoinPool(sources.get().getAll().size());
+      fetchCallsPool
+          .submit(
+              () ->
+                  sources
+                      .get()
+                      .getAll()
+                      .parallelStream()
+                      .forEach(
+                          source -> {
+                            callFetch(source, project, refName, state);
+                          }))
+          .get(fetchCallsTimeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      stateLog.error(
+          String.format(
+              "Exception during the pull replication fetch rest api call.  Message:%s",
+              e.getMessage()),
+          e,
+          state);
+    } finally {
+      if (fetchCallsPool != null) {
+        fetchCallsPool.shutdown();
+      }
+    }
+  }
 
-    for (Source cfg : sources.get().getAll()) {
-      if (cfg.wouldFetchProject(project) && cfg.wouldFetchRef(refName)) {
-        for (String apiUrl : cfg.getApis()) {
-          try {
-            URIish uri = new URIish(apiUrl);
-            FetchRestApiClient fetchClient = fetchClientFactory.create(cfg);
+  private void callFetch(
+      Source source, Project.NameKey project, String refName, ReplicationState state) {
+    if (source.wouldFetchProject(project) && source.wouldFetchRef(refName)) {
+      for (String apiUrl : source.getApis()) {
+        try {
+          URIish uri = new URIish(apiUrl);
+          FetchRestApiClient fetchClient = fetchClientFactory.create(source);
 
-            HttpResult result = fetchClient.callFetch(project, refName, uri);
+          HttpResult result = fetchClient.callFetch(project, refName, uri);
 
-            if (!result.isSuccessful()) {
-              stateLog.warn(
-                  String.format(
-                      "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
-                      apiUrl, result.getMessage().orElse("unknown")),
-                  state);
-            }
-          } catch (URISyntaxException e) {
-            stateLog.warn(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
-          } catch (Exception e) {
-            stateLog.error(
+          if (!result.isSuccessful()) {
+            stateLog.warn(
                 String.format(
-                    "Exception during the pull replication fetch rest api call. Endpoint url:%s, message:%s",
-                    apiUrl, e.getMessage()),
-                e,
+                    "Pull replication rest api fetch call failed. Endpoint url: %s, reason:%s",
+                    apiUrl, result.getMessage().orElse("unknown")),
                 state);
           }
+        } catch (URISyntaxException e) {
+          stateLog.warn(String.format("Cannot parse pull replication api url:%s", apiUrl), state);
+        } catch (Exception e) {
+          stateLog.error(
+              String.format(
+                  "Exception during the pull replication fetch rest api call. Endpoint url:%s, message:%s",
+                  apiUrl, e.getMessage()),
+              e,
+              state);
         }
       }
     }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index c5d13a0..ae2c2fd 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -147,6 +147,40 @@
 
 	Default: 50
 
+replication.excludeRefs
+:   Specify which refs should be excluded from git fetch calls. It can be provided
+    more than once, and supports three formats: regular expressions, wildcard
+    matching, and single ref matching. All three formats match are case-sensitive.
+
+    Values starting with a caret `^` are treated as regular
+    expressions. For the regular expressions details please follow
+    official [java documentation](https://docs.oracle.com/javase/tutorial/essential/regex/).
+
+    Please note that regular expressions could also be used
+    with inverse match.
+
+    Values that are not regular expressions and end in `*` are
+    treated as wildcard matches. Wildcards match refs whose
+    name agrees from the beginning until the trailing `*`. So
+    `foo/b*` would match the refs `foo/b`, `foo/bar`, and
+    `foo/baz`, but neither `foobar`, nor `bar/foo/baz`.
+
+    Values that are neither regular expressions nor wildcards are
+    treated as single ref matches. So `foo/bar` matches only
+    the ref `foo/bar`, but no other refs.
+
+    Following refs are always excluded from the git fetch calls:
+    - refs/users/*
+    - refs/meta/config
+    - refs/sequences/*
+    - refs/meta/external-ids
+    - refs/groups/*
+    - refs/meta/group-names
+    - refs/cache-automerge/*
+    - refs/starred-changes/*
+
+    By default, all other refs are included.
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
new file mode 100644
index 0000000..61520f4
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationQueueTest.java
@@ -0,0 +1,194 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static java.nio.file.Files.createTempDirectory;
+
+import com.google.gerrit.extensions.api.changes.NotifyHandling;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.GitReferenceUpdatedListener.Event;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Provider;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplicationQueueTest {
+  @Mock private WorkQueue wq;
+  @Mock private Provider<SourcesCollection> rd;
+  @Mock private DynamicItem<EventDispatcher> dis;
+  @Mock ReplicationStateListeners sl;
+  @Mock FetchRestApiClient.Factory fetchClientFactory;
+  @Mock AccountInfo accountInfo;
+
+  RefsFilter refsFilter;
+
+  private ReplicationQueue objectUnderTest;
+  private SitePaths sitePaths;
+  private Path pluginDataPath;
+
+  @Before
+  public void setup() throws IOException {
+    Path sitePath = createTempPath("site");
+    sitePaths = new SitePaths(sitePath);
+    Path pluginDataPath = createTempPath("data");
+    ReplicationConfig replicationConfig = new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
+    refsFilter = new RefsFilter(replicationConfig);
+    objectUnderTest = new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter);
+  }
+
+  @Test
+  public void shouldSkipEventWhenUsersRef() {
+    Event event = new TestEvent("refs/users/00/1000000");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenGroupsRef() {
+    Event event = new TestEvent("refs/groups/a1/a16d5b33cc789d60b682c654f03f9cc2feb12975");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenGroupNamesRef() {
+    Event event = new TestEvent("refs/meta/group-names");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenMultiSequenceRef() {
+    Event event = new TestEvent("refs/sequences/changes");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenMultiSiteVersionRef() throws IOException {
+    FileBasedConfig fileConfig =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    fileConfig.setString("replication", null, "excludeRefs", "refs/multi-site/version");
+    fileConfig.save();
+    ReplicationConfig replicationConfig = new ReplicationFileBasedConfig(sitePaths, pluginDataPath);
+    refsFilter = new RefsFilter(replicationConfig);
+    objectUnderTest = new ReplicationQueue(wq, rd, dis, sl, fetchClientFactory, refsFilter);
+    Event event = new TestEvent("refs/multi-site/version");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenStarredChangesRef() {
+    Event event = new TestEvent("refs/starred-changes/41/2941/1000000");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenConfigRef() {
+    Event event = new TestEvent("refs/meta/config");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  @Test
+  public void shouldSkipEventWhenExternalIdsRef() {
+    Event event = new TestEvent("refs/meta/external-ids");
+    objectUnderTest.onGitReferenceUpdated(event);
+
+    Mockito.verifyZeroInteractions(wq, rd, dis, sl, fetchClientFactory, accountInfo);
+  }
+
+  protected static Path createTempPath(String prefix) throws IOException {
+    return createTempDirectory(prefix);
+  }
+
+  private class TestEvent implements GitReferenceUpdatedListener.Event {
+    private String refName;
+
+    public TestEvent(String refName) {
+      this.refName = refName;
+    }
+
+    @Override
+    public String getRefName() {
+      return refName;
+    }
+
+    @Override
+    public String getProjectName() {
+      return null;
+    }
+
+    @Override
+    public NotifyHandling getNotify() {
+      return null;
+    }
+
+    @Override
+    public String getOldObjectId() {
+      return null;
+    }
+
+    @Override
+    public String getNewObjectId() {
+      return null;
+    }
+
+    @Override
+    public boolean isCreate() {
+      return false;
+    }
+
+    @Override
+    public boolean isDelete() {
+      return false;
+    }
+
+    @Override
+    public boolean isNonFastForward() {
+      return false;
+    }
+
+    @Override
+    public AccountInfo getUpdater() {
+      return null;
+    }
+  }
+}