Merge branch 'stable-3.1'

* stable-3.1:
  Jenkinsfile: use gerrit-ci-library pipeline for plugin validation
  Mute logging for missing fetch param in remote section
  Fix NPE issue from pull replication after configuration auto reload
  Make HTTP connection parameters source-aware
  Fix issue with auto reload of sources
  Add 'Pull Replication' capability
  Adopt new replication configuration structure

Change-Id: I5d1cfdba092afc8ba7abd2f52561f1ee575fdd90
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..448692b
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,2 @@
+pluginPipeline(formatCheckId: 'gerritforge:pull-replication-format-3852e64366bb37d13b8baf8af9b15cfd38eb9227',
+               buildCheckId: 'gerritforge:pull-replication-3852e64366bb37d13b8baf8af9b15cfd38eb9227')
\ No newline at end of file
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 1b079dd..9add07c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -32,28 +32,33 @@
 import com.google.inject.internal.UniqueAnnotations;
 import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
+import com.googlesource.gerrit.plugins.replication.ConfigParser;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.FanoutReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.MainReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.ObservableQueue;
 import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
-import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
 import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
 import com.googlesource.gerrit.plugins.replication.pull.client.FetchRestApiClient;
-import com.googlesource.gerrit.plugins.replication.pull.client.HttpClientProvider;
+import com.googlesource.gerrit.plugins.replication.pull.client.HttpClient;
+import com.googlesource.gerrit.plugins.replication.pull.client.SourceHttpClient;
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.errors.ConfigInvalidException;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.util.FS;
 
 class PullReplicationModule extends AbstractModule {
+  private final SitePaths site;
   private final Path cfgPath;
 
   @Inject
   public PullReplicationModule(SitePaths site) {
+    this.site = site;
     cfgPath = site.etc_dir.resolve("replication.config");
   }
 
@@ -62,7 +67,11 @@
 
     install(new PullReplicationApiModule());
 
-    bind(CloseableHttpClient.class).toProvider(HttpClientProvider.class).in(Scopes.SINGLETON);
+    install(
+        new FactoryModuleBuilder()
+            .implement(HttpClient.class, SourceHttpClient.class)
+            .build(SourceHttpClient.Factory.class));
+
     install(new FactoryModuleBuilder().build(Source.Factory.class));
     install(new FactoryModuleBuilder().build(FetchRestApiClient.Factory.class));
 
@@ -94,15 +103,18 @@
 
     DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReplicationQueue.class);
 
-    bind(ReplicationConfigValidator.class).to(SourcesCollection.class);
+    bind(ConfigParser.class).to(SourceConfigParser.class).in(Scopes.SINGLETON);
 
     if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
-      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+      bind(ReplicationConfig.class)
+          .annotatedWith(MainReplicationConfig.class)
+          .to(getReplicationConfigClass());
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class).in(Scopes.SINGLETON);
       bind(LifecycleListener.class)
           .annotatedWith(UniqueAnnotations.create())
           .to(AutoReloadConfigDecorator.class);
     } else {
-      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+      bind(ReplicationConfig.class).to(getReplicationConfigClass()).in(Scopes.SINGLETON);
     }
 
     DynamicSet.setOf(binder(), ReplicationStateListener.class);
@@ -122,4 +134,11 @@
     }
     return config;
   }
+
+  private Class<? extends ReplicationConfig> getReplicationConfigClass() {
+    if (Files.exists(site.etc_dir.resolve("replication"))) {
+      return FanoutReplicationConfig.class;
+    }
+    return ReplicationFileBasedConfig.class;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 0f03929..3a4d4a7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.common.data.GroupReference;
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.entities.BranchNameKey;
@@ -73,7 +74,9 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.commons.io.FilenameUtils;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -85,6 +88,7 @@
 
 public class Source {
   private static final Logger repLog = PullReplicationLogger.repLog;
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   public interface Factory {
     Source create(SourceConfiguration config);
@@ -103,6 +107,7 @@
   private final PerThreadRequestScope.Scoper threadScoper;
   private final SourceConfiguration config;
   private final DynamicItem<EventDispatcher> eventDispatcher;
+  private CloseableHttpClient httpClient;
 
   protected enum RetryReason {
     TRANSPORT_ERROR,
@@ -194,6 +199,14 @@
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
   }
 
+  public synchronized CloseableHttpClient memoize(
+      Supplier<CloseableHttpClient> httpClientSupplier) {
+    if (httpClient == null) {
+      httpClient = httpClientSupplier.get();
+    }
+    return httpClient;
+  }
+
   private void addRecursiveParents(
       AccountGroup.UUID g,
       Builder<AccountGroup.UUID> builder,
@@ -218,12 +231,21 @@
     pool = workQueue.createQueue(config.getPoolThreads(), poolName);
   }
 
-  public int shutdown() {
+  public synchronized int shutdown() {
     int cnt = 0;
     if (pool != null) {
       cnt = pool.shutdownNow().size();
       pool = null;
     }
+    if (httpClient != null) {
+      try {
+        httpClient.close();
+        httpClient = null;
+      } catch (IOException e) {
+        logger.atSevere().withCause(e).log("Error occurred while closing HTTP client connections");
+      }
+    }
+
     return cnt;
   }
 
@@ -647,6 +669,22 @@
     return config.getApis();
   }
 
+  public int getConnectionTimeout() {
+    return config.getConnectionTimeout();
+  }
+
+  public int getIdleTimeout() {
+    return config.getIdleTimeout();
+  }
+
+  public int getMaxConnectionsPerRoute() {
+    return config.getMaxConnectionsPerRoute();
+  }
+
+  public int getMaxConnections() {
+    return config.getMaxConnections();
+  }
+
   public int getMaxRetries() {
     return config.getMaxRetries();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfigParser.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfigParser.java
new file mode 100644
index 0000000..6dfed44
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfigParser.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.googlesource.gerrit.plugins.replication.ConfigParser;
+import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+public class SourceConfigParser implements ConfigParser {
+
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  /* (non-Javadoc)
+   * @see com.googlesource.gerrit.plugins.replication.ConfigParser#parseRemotes(org.eclipse.jgit.lib.Config)
+   */
+  public List<RemoteConfiguration> parseRemotes(Config config) throws ConfigInvalidException {
+
+    if (config.getSections().isEmpty()) {
+      logger.atWarning().log("Replication config does not exist or it's empty; not replicating");
+      return Collections.emptyList();
+    }
+
+    ImmutableList.Builder<RemoteConfiguration> sourceConfigs = ImmutableList.builder();
+    for (RemoteConfig c : allFetchRemotes(config)) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // fetch source has to be specified.
+      if (c.getFetchRefSpecs().isEmpty()) {
+        throw new ConfigInvalidException(
+            String.format("You must specify a valid refSpec for this remote"));
+      }
+
+      SourceConfiguration sourceConfig = new SourceConfiguration(c, config);
+
+      if (!sourceConfig.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format("remote.%s.url \"%s\" lacks ${name} placeholder", c.getName(), u));
+          }
+        }
+      }
+      sourceConfigs.add(sourceConfig);
+    }
+    return sourceConfigs.build();
+  }
+
+  private static List<RemoteConfig> allFetchRemotes(Config cfg) throws ConfigInvalidException {
+
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        final RemoteConfig remoteConfig = new RemoteConfig(cfg, name);
+        if (!remoteConfig.getFetchRefSpecs().isEmpty()) {
+          result.add(remoteConfig);
+        } else {
+          logger.atFine().log(
+              "Skip loading of remote [remote \"%s\"], since it has no 'fetch' configuration",
+              name);
+        }
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg));
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 009d952..04fae31 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -26,6 +26,9 @@
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
+  static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
+  static final int DEFAULT_CONNECTION_TIMEOUT_MS = 5000;
+  static final int DEFAULT_CONNECTIONS_PER_ROUTE = 100;
 
   private final int delay;
   private final int rescheduleDelay;
@@ -41,6 +44,10 @@
   private final ImmutableList<String> authGroupNames;
   private final RemoteConfig remoteConfig;
   private final ImmutableList<String> apis;
+  private final int connectionTimeout;
+  private final int idleTimeout;
+  private final int maxConnectionsPerRoute;
+  private final int maxConnections;
   private final int maxRetries;
   private int slowLatencyThreshold;
 
@@ -49,6 +56,12 @@
     String name = remoteConfig.getName();
     urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
     apis = ImmutableList.copyOf(cfg.getStringList("remote", name, "apiUrl"));
+    connectionTimeout =
+        cfg.getInt("remote", name, "connectionTimeout", DEFAULT_CONNECTION_TIMEOUT_MS);
+    idleTimeout = cfg.getInt("remote", name, "idleTimeout", DEFAULT_MAX_CONNECTION_INACTIVITY_MS);
+    maxConnectionsPerRoute =
+        cfg.getInt("replication", "maxConnectionsPerRoute", DEFAULT_CONNECTIONS_PER_ROUTE);
+    maxConnections = cfg.getInt("replication", "maxConnections", 2 * maxConnectionsPerRoute);
     delay = Math.max(0, getInt(remoteConfig, cfg, "replicationdelay", DEFAULT_REPLICATION_DELAY));
     rescheduleDelay =
         Math.max(3, getInt(remoteConfig, cfg, "rescheduledelay", DEFAULT_RESCHEDULE_DELAY));
@@ -109,6 +122,22 @@
     return apis;
   }
 
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
+
+  public int getIdleTimeout() {
+    return idleTimeout;
+  }
+
+  public int getMaxConnectionsPerRoute() {
+    return maxConnectionsPerRoute;
+  }
+
+  public int getMaxConnections() {
+    return maxConnections;
+  }
+
   @Override
   public ImmutableList<String> getAdminUrls() {
     return adminUrls;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
index f24d25e..7be4971 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -16,41 +16,41 @@
 
 import static java.util.stream.Collectors.toList;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
+import com.google.inject.Provider;
 import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.ConfigParser;
 import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
-import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
-import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
-import java.io.IOException;
-import java.net.URISyntaxException;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import org.eclipse.jgit.errors.ConfigInvalidException;
-import org.eclipse.jgit.storage.file.FileBasedConfig;
-import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.URIish;
 
 @Singleton
-public class SourcesCollection implements ReplicationSources, ReplicationConfigValidator {
+public class SourcesCollection implements ReplicationSources {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
 
   private final Source.Factory sourceFactory;
   private volatile List<Source> sources;
   private boolean shuttingDown;
+  private final Provider<ReplicationQueue> replicationQueue;
 
   @Inject
   public SourcesCollection(
-      ReplicationFileBasedConfig replicationConfig, Source.Factory sourceFactory, EventBus eventBus)
+      ReplicationConfig replicationConfig,
+      ConfigParser configParser,
+      Source.Factory sourceFactory,
+      EventBus eventBus,
+      Provider<ReplicationQueue> replicationQueue)
       throws ConfigInvalidException {
     this.sourceFactory = sourceFactory;
-    this.sources = allSources(sourceFactory, validateConfig(replicationConfig));
+    this.sources =
+        allSources(sourceFactory, configParser.parseRemotes(replicationConfig.getConfig()));
+    this.replicationQueue = replicationQueue;
     eventBus.register(this);
   }
 
@@ -113,69 +113,12 @@
       logger.atWarning().log("Shutting down: configuration reload ignored");
       return;
     }
-
-    sources = allSources(sourceFactory, sourceConfigurations);
-    logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
-  }
-
-  @Override
-  public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
-      throws ConfigInvalidException {
-
     try {
-      newConfig.getConfig().load();
-    } catch (IOException e) {
-      throw new ConfigInvalidException(
-          String.format("Cannot read %s: %s", newConfig.getConfig().getFile(), e.getMessage()), e);
+      replicationQueue.get().stop();
+      sources = allSources(sourceFactory, sourceConfigurations);
+      logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
+    } finally {
+      replicationQueue.get().start();
     }
-
-    ImmutableList.Builder<RemoteConfiguration> sourceConfigs = ImmutableList.builder();
-    for (RemoteConfig c : allFetchRemotes(newConfig.getConfig())) {
-      if (c.getURIs().isEmpty()) {
-        continue;
-      }
-
-      // fetch source has to be specified.
-      if (c.getFetchRefSpecs().isEmpty()) {
-        throw new ConfigInvalidException(
-            String.format("You must specify a valid refSpec for this remote"));
-      }
-
-      SourceConfiguration sourceConfig = new SourceConfiguration(c, newConfig.getConfig());
-
-      if (!sourceConfig.isSingleProjectMatch()) {
-        for (URIish u : c.getURIs()) {
-          if (u.getPath() == null || !u.getPath().contains("${name}")) {
-            throw new ConfigInvalidException(
-                String.format("remote.%s.url \"%s\" lacks ${name} placeholder", c.getName(), u));
-          }
-        }
-      }
-      sourceConfigs.add(sourceConfig);
-    }
-    return sourceConfigs.build();
-  }
-
-  private static List<RemoteConfig> allFetchRemotes(FileBasedConfig cfg)
-      throws ConfigInvalidException {
-
-    Set<String> names = cfg.getSubsections("remote");
-    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
-    for (String name : names) {
-      try {
-        final RemoteConfig remoteConfig = new RemoteConfig(cfg, name);
-        if (!remoteConfig.getFetchRefSpecs().isEmpty()) {
-          result.add(remoteConfig);
-        } else {
-          logger.atWarning().log(
-              "Skip loading of remote [remote \"%s\"], since it has no 'fetch' configuration",
-              name);
-        }
-      } catch (URISyntaxException e) {
-        throw new ConfigInvalidException(
-            String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
-      }
-    }
-    return result;
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
index d51590d..c44bcbc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -20,6 +20,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
@@ -40,13 +41,18 @@
   private final FetchCommand command;
   private final WorkQueue workQueue;
   private final DynamicItem<UrlFormatter> urlFormatter;
+  private final FetchPreconditions preConditions;
 
   @Inject
   public FetchAction(
-      FetchCommand command, WorkQueue workQueue, DynamicItem<UrlFormatter> urlFormatter) {
+      FetchCommand command,
+      WorkQueue workQueue,
+      DynamicItem<UrlFormatter> urlFormatter,
+      FetchPreconditions preConditions) {
     this.command = command;
     this.workQueue = workQueue;
     this.urlFormatter = urlFormatter;
+    this.preConditions = preConditions;
   }
 
   public static class Input {
@@ -57,6 +63,10 @@
 
   @Override
   public Response<?> apply(ProjectResource resource, Input input) throws RestApiException {
+
+    if (!preConditions.canCallFetchApi()) {
+      throw new AuthException("not allowed to call fetch command");
+    }
     try {
       if (Strings.isNullOrEmpty(input.label)) {
         throw new BadRequestException("Source label cannot be null or empty");
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchApiCapability.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchApiCapability.java
new file mode 100644
index 0000000..73a4ac5
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchApiCapability.java
@@ -0,0 +1,26 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.extensions.config.CapabilityDefinition;
+
+public class FetchApiCapability extends CapabilityDefinition {
+  static final String CALL_FETCH_ACTION = "callFetchAction";
+
+  @Override
+  public String getDescription() {
+    return "Pull Replication";
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java
new file mode 100644
index 0000000..ca1557a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchPreconditions.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.googlesource.gerrit.plugins.replication.pull.api.FetchApiCapability.CALL_FETCH_ACTION;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.extensions.api.access.PluginPermission;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.permissions.GlobalPermission;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+
+public class FetchPreconditions {
+  private final String pluginName;
+  private final PermissionBackend permissionBackend;
+  private final Provider<CurrentUser> userProvider;
+
+  @Inject
+  public FetchPreconditions(
+      @PluginName String pluginName,
+      Provider<CurrentUser> userProvider,
+      PermissionBackend permissionBackend) {
+    this.pluginName = pluginName;
+    this.userProvider = userProvider;
+    this.permissionBackend = permissionBackend;
+  }
+
+  public Boolean canCallFetchApi() {
+    PermissionBackend.WithUser userPermission = permissionBackend.user(userProvider.get());
+    return userPermission.testOrFalse(GlobalPermission.ADMINISTRATE_SERVER)
+        || userPermission.testOrFalse(new PluginPermission(pluginName, CALL_FETCH_ACTION));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
index 470c985..1663ad2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
@@ -15,7 +15,10 @@
 package com.googlesource.gerrit.plugins.replication.pull.api;
 
 import static com.google.gerrit.server.project.ProjectResource.PROJECT_KIND;
+import static com.googlesource.gerrit.plugins.replication.pull.api.FetchApiCapability.CALL_FETCH_ACTION;
 
+import com.google.gerrit.extensions.annotations.Exports;
+import com.google.gerrit.extensions.config.CapabilityDefinition;
 import com.google.gerrit.extensions.restapi.RestApiModule;
 import com.google.inject.Scopes;
 
@@ -24,5 +27,10 @@
   protected void configure() {
     bind(FetchAction.class).in(Scopes.SINGLETON);
     post(PROJECT_KIND, "fetch").to(FetchAction.class);
+
+    bind(FetchPreconditions.class).in(Scopes.SINGLETON);
+    bind(CapabilityDefinition.class)
+        .annotatedWith(Exports.named(CALL_FETCH_ACTION))
+        .to(FetchApiCapability.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 4296f67..502d3b7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -20,7 +20,7 @@
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
-import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
 import com.googlesource.gerrit.plugins.replication.pull.Source;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -36,7 +36,6 @@
 import org.apache.http.client.protocol.HttpClientContext;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.util.EntityUtils;
 import org.eclipse.jgit.transport.CredentialItem;
@@ -51,18 +50,18 @@
   }
 
   private final CredentialsFactory credentials;
-  private final CloseableHttpClient httpClient;
+  private final SourceHttpClient.Factory httpClientFactory;
   private final Source source;
   private final String instanceLabel;
 
   @Inject
   FetchRestApiClient(
       CredentialsFactory credentials,
-      CloseableHttpClient httpClient,
-      ReplicationFileBasedConfig replicationConfig,
+      SourceHttpClient.Factory httpClientFactory,
+      ReplicationConfig replicationConfig,
       @Assisted Source source) {
     this.credentials = credentials;
-    this.httpClient = httpClient;
+    this.httpClientFactory = httpClientFactory;
     this.source = source;
     this.instanceLabel =
         replicationConfig.getConfig().getString("replication", null, "instanceLabel");
@@ -81,7 +80,7 @@
             String.format("{\"label\":\"%s\", \"ref_name\": \"%s\"}", instanceLabel, refName),
             StandardCharsets.UTF_8));
     post.addHeader(new BasicHeader("Content-Type", "application/json"));
-    return httpClient.execute(post, this, getContext(targetUri));
+    return httpClientFactory.create(source).execute(post, this, getContext(targetUri));
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
new file mode 100644
index 0000000..7bfc7d1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClient.java
@@ -0,0 +1,31 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import java.io.IOException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.protocol.HttpContext;
+
+/** HTTP client for executing URI requests to a remote site */
+public interface HttpClient {
+
+  public <T> T execute(
+      final HttpUriRequest request,
+      final ResponseHandler<? extends T> responseHandler,
+      final HttpContext context)
+      throws ClientProtocolException, IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java
deleted file mode 100644
index 6d6e803..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/HttpClientProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.replication.pull.client;
-
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.conn.HttpClientConnectionManager;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-
-/** Provides an HTTP client with SSL capabilities. */
-public class HttpClientProvider implements Provider<CloseableHttpClient> {
-  private static final int CONNECTIONS_PER_ROUTE = 100;
-
-  // Up to 2 target instances with the max number of connections per host:
-  private static final int MAX_CONNECTIONS = 2 * CONNECTIONS_PER_ROUTE;
-
-  private static final int MAX_CONNECTION_INACTIVITY_MS = 10000;
-  private static final int DEFAULT_TIMEOUT_MS = 5000;
-
-  @Override
-  public CloseableHttpClient get() {
-    try {
-      return HttpClients.custom()
-          .setConnectionManager(customConnectionManager())
-          .setDefaultRequestConfig(customRequestConfig())
-          .build();
-    } catch (Exception e) {
-      throw new ProvisionException("Couldn't create CloseableHttpClient", e);
-    }
-  }
-
-  private RequestConfig customRequestConfig() {
-    return RequestConfig.custom()
-        .setConnectTimeout(DEFAULT_TIMEOUT_MS)
-        .setSocketTimeout(DEFAULT_TIMEOUT_MS)
-        .setConnectionRequestTimeout(DEFAULT_TIMEOUT_MS)
-        .build();
-  }
-
-  private HttpClientConnectionManager customConnectionManager() throws Exception {
-    PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
-    connManager.setDefaultMaxPerRoute(CONNECTIONS_PER_ROUTE);
-    connManager.setMaxTotal(MAX_CONNECTIONS);
-    connManager.setValidateAfterInactivity(MAX_CONNECTION_INACTIVITY_MS);
-    return connManager;
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
new file mode 100644
index 0000000..ee0fe79
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/SourceHttpClient.java
@@ -0,0 +1,74 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.client;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import java.io.IOException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.protocol.HttpContext;
+
+/** Apache HTTP client implementation based on Source-specific parameters */
+public class SourceHttpClient implements HttpClient {
+  private final Source source;
+
+  public interface Factory {
+    public HttpClient create(Source source);
+  }
+
+  @Inject
+  public SourceHttpClient(@Assisted Source source) {
+    this.source = source;
+  }
+
+  @Override
+  public <T> T execute(
+      HttpUriRequest request, ResponseHandler<? extends T> responseHandler, HttpContext context)
+      throws ClientProtocolException, IOException {
+    return source
+        .memoize(
+            () ->
+                HttpClients.custom()
+                    .setConnectionManager(customConnectionManager(source))
+                    .setDefaultRequestConfig(customRequestConfig(source))
+                    .build())
+        .execute(request, responseHandler, context);
+  }
+
+  private static RequestConfig customRequestConfig(Source source) {
+    int connectionTimeout = source.getConnectionTimeout();
+    return RequestConfig.custom()
+        .setConnectTimeout(connectionTimeout)
+        .setSocketTimeout(connectionTimeout)
+        .setConnectionRequestTimeout(connectionTimeout)
+        .build();
+  }
+
+  private static HttpClientConnectionManager customConnectionManager(Source source) {
+    PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
+
+    connManager.setDefaultMaxPerRoute(source.getMaxConnectionsPerRoute());
+    connManager.setMaxTotal(source.getMaxConnections());
+    connManager.setValidateAfterInactivity(source.getIdleTimeout());
+    return connManager;
+  }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
index 46ec740..d921e0b 100644
--- a/src/main/resources/Documentation/about.md
+++ b/src/main/resources/Documentation/about.md
@@ -8,3 +8,10 @@
 configuration is not recommended.  It is also possible to specify a
 local path as replication source. This makes e.g. sense if a network
 share is mounted to which the repositories should be replicated from.
+
+Access
+------
+
+To be allowed to trigger pull replication a user must be a member of a
+group that is granted the 'Pull Replication' capability (provided
+by this plugin) or the 'Administrate Server' capability.
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index c8c53df..a6ea668 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -121,6 +121,16 @@
 	provided in the remote configuration section which name is equal
 	to instanceLabel.
 
+replication.maxConnectionsPerRoute
+:	Maximum number of HTTP connections per one HTTP route.
+
+	Default: 100
+
+replication.maxConnections
+:	Total number of HTTP connections pool.
+
+	Default: 2 * replication.maxConnectionsPerRoute
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
@@ -148,6 +158,21 @@
 	different destinations which share the same settings. Gerrit calls
 	all URLs in sequence.
 
+remote.NAME.connectionTimeout
+:	Defines the socket timeout ({@code SO_TIMEOUT}) in milliseconds,
+	which is the timeout for waiting for data or, put differently,
+	a maximum period inactivity between two consecutive data packets.
+
+	Default: 5000
+
+remote.NAME.idleTimeout
+:	Defines period of inactivity in milliseconds after which persistent connections must
+	be re-validated prior to being leased to the consumer. Non-positive value disables 
+	connection validation. This check helps detect connections that have become stale
+	(half-closed) while kept inactive in the pool.
+
+	Default: 10000
+
 remote.NAME.uploadpack
 :	Path of the `git-upload-pack` executable on the remote system,
 	if using the SSH transport.
@@ -291,6 +316,82 @@
 	By default, replicates without matching, i.e. replicates
 	everything from all remotes.
 
+Directory `replication`
+--------------------
+The optional directory `$site_path/etc/replication` contains Git-style
+config files that controls the replication settings for the pull replication
+plugin. When present all `remote` sections from `replication.config` file are
+ignored.
+
+Files are composed of one `remote` section. Multiple `remote` sections or any
+other section makes the file invalid and skipped by the pull replication plugin.
+File name defines remote section name. Each section provides common configuration
+settings for one or more destination URLs. For more details how to setup `remote`
+sections please refer to the `replication.config` section.
+
+### Configuration example:
+
+Static configuration in `$site_path/etc/replication.config`:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+	instanceLabel = host-one
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+```
+
+Remote sections in `$site_path/etc/replication` directory:
+
+* File `$site_path/etc/replication/host-two.config`
+
+ ```
+ [remote]
+    url = gerrit2@host-two.example.com:/some/path/${name}.git
+    apiUrl = http://host-two
+    fetch = +refs/*:refs/*
+ ```
+
+
+* File `$site_path/etc/replication/host-three.config`
+
+ ```
+  [remote]
+    url = mirror1.host-three:/pub/git/${name}.git
+    url = mirror2.host-three:/pub/git/${name}.git
+    url = mirror3.host-three:/pub/git/${name}.git
+    apiUrl = http://host-three
+    fetch = +refs/heads/*:refs/heads/*
+    fetch = +refs/tags/*:refs/tags/*
+ ```
+
+Pull replication plugin resolves config files to the following configuration:
+
+```
+[gerrit]
+    autoReload = true
+    replicateOnStartup = false
+[replication]
+    instanceLabel = host-one
+    lockErrorMaxRetries = 5
+    maxRetries = 5
+
+[remote "host-two"]
+    url = gerrit2@host-two.example.com:/some/path/${name}.git
+    apiUrl = http://host-two
+    fetch = +refs/*:refs/*
+
+[remote "host-three"]
+    url = mirror1.host-three:/pub/git/${name}.git
+    url = mirror2.host-three:/pub/git/${name}.git
+    url = mirror3.host-three:/pub/git/${name}.git
+    apiUrl = http://host-three
+    fetch = +refs/heads/*:refs/heads/*
+    fetch = +refs/tags/*:refs/tags/*
+```
+
 File `secure.config`
 --------------------
 
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
new file mode 100644
index 0000000..cb5af42
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationFanoutConfigIT.java
@@ -0,0 +1,311 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.ReceiveCommand;
+import org.eclipse.jgit.util.FS;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule")
+public class PullReplicationFanoutConfigIT extends LightweightPluginDaemonTest {
+  private static final Optional<String> ALL_PROJECTS = Optional.empty();
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int TEST_REPLICATION_DELAY = 60;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final String TEST_REPLICATION_REMOTE = "remote1";
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  private Path gitPath;
+  private FileBasedConfig config;
+  private FileBasedConfig remoteConfig;
+  private FileBasedConfig secureConfig;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+
+    config =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("replication.config").toFile(), FS.DETECTED);
+    remoteConfig =
+        new FileBasedConfig(
+            sitePaths
+                .etc_dir
+                .resolve("replication/" + TEST_REPLICATION_REMOTE + ".config")
+                .toFile(),
+            FS.DETECTED);
+
+    setReplicationSource(
+        TEST_REPLICATION_REMOTE); // Simulates a full replication.config initialization
+
+    setRemoteConfig(TEST_REPLICATION_SUFFIX, ALL_PROJECTS);
+
+    secureConfig =
+        new FileBasedConfig(sitePaths.etc_dir.resolve("secure.config").toFile(), FS.DETECTED);
+    setReplicationCredentials(TEST_REPLICATION_REMOTE, admin.username(), admin.httpPassword());
+    secureConfig.save();
+
+    super.setUpTestPlugin();
+  }
+
+  @Test
+  public void shouldReplicateNewChangeRef() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewChangeRefAfterConfigReloaded() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    // Trigger configuration autoreload
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+    SourcesCollection sources = getInstance(SourcesCollection.class);
+    remoteConfig.setInt("remote", null, "timeout", 1000);
+    remoteConfig.save();
+    autoReloadConfigDecorator.reload();
+    waitUntil(() -> !sources.getAll().isEmpty() && sources.getAll().get(0).getTimeout() == 1000);
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    final String sourceRef = pushResult.getPatchSet().refName();
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranch() throws Exception {
+    String testProjectName = project + TEST_REPLICATION_SUFFIX;
+    createTestProject(testProjectName);
+
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
+
+    ReplicationQueue pullReplicationQueue =
+        plugin.getSysInjector().getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            newBranch,
+            ObjectId.zeroId().getName(),
+            branchRevision,
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
+    }
+  }
+
+  @Test
+  public void shouldAutoReloadConfiguration() throws Exception {
+    SourcesCollection sources = getInstance(SourcesCollection.class);
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+    assertThat(sources.getAll().get(0).getTimeout()).isEqualTo(600);
+    remoteConfig.setInt("remote", null, "timeout", 1000);
+    remoteConfig.save();
+    autoReloadConfigDecorator.reload();
+    waitUntil(() -> !sources.getAll().isEmpty() && sources.getAll().get(0).getTimeout() == 1000);
+  }
+
+  @Test
+  public void shouldAutoReloadConfigurationWhenRemoteConfigAdded() throws Exception {
+    FileBasedConfig newRemoteConfig =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/new-remote-config.config").toFile(),
+            FS.DETECTED);
+    try {
+      SourcesCollection sources = getInstance(SourcesCollection.class);
+      AutoReloadConfigDecorator autoReloadConfigDecorator =
+          getInstance(AutoReloadConfigDecorator.class);
+      assertThat(sources.getAll().size()).isEqualTo(1);
+
+      setRemoteConfig(newRemoteConfig, TEST_REPLICATION_SUFFIX, ALL_PROJECTS);
+      autoReloadConfigDecorator.reload();
+      waitUntil(() -> sources.getAll().size() == 2);
+
+    } finally {
+      // clean up
+      Files.delete(newRemoteConfig.getFile().toPath());
+    }
+  }
+
+  @Test
+  public void shouldAutoReloadConfigurationWhenRemoteConfigDeleted() throws Exception {
+    SourcesCollection sources = getInstance(SourcesCollection.class);
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+    assertThat(sources.getAll().size()).isEqualTo(1);
+
+    FileBasedConfig newRemoteConfig =
+        new FileBasedConfig(
+            sitePaths.etc_dir.resolve("replication/new-remote-config.config").toFile(),
+            FS.DETECTED);
+    setRemoteConfig(newRemoteConfig, TEST_REPLICATION_SUFFIX, ALL_PROJECTS);
+    autoReloadConfigDecorator.reload();
+    waitUntil(() -> sources.getAll().size() == 2);
+
+    Files.delete(newRemoteConfig.getFile().toPath());
+
+    autoReloadConfigDecorator.reload();
+
+    waitUntil(() -> sources.getAll().size() == 1);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private void setReplicationSource(String remoteName) throws IOException {
+    config.setString("replication", null, "instanceLabel", remoteName);
+    config.setBoolean("gerrit", null, "autoReload", true);
+    config.save();
+  }
+
+  private void setRemoteConfig(String replicaSuffix, Optional<String> project) throws IOException {
+    setRemoteConfig(remoteConfig, replicaSuffix, project);
+  }
+
+  private void setRemoteConfig(
+      FileBasedConfig remoteConfig, String replicaSuffix, Optional<String> project)
+      throws IOException {
+    setRemoteConfig(remoteConfig, Arrays.asList(replicaSuffix), project);
+  }
+
+  private void setRemoteConfig(
+      FileBasedConfig remoteConfig, List<String> replicaSuffixes, Optional<String> project)
+      throws IOException {
+    List<String> replicaUrls =
+        replicaSuffixes.stream()
+            .map(suffix -> gitPath.resolve("${name}" + suffix + ".git").toString())
+            .collect(toList());
+    remoteConfig.setStringList("remote", null, "url", replicaUrls);
+    remoteConfig.setString("remote", null, "apiUrl", adminRestSession.url());
+    remoteConfig.setString("remote", null, "fetch", "+refs/*:refs/*");
+    remoteConfig.setInt("remote", null, "timeout", 600);
+    remoteConfig.setInt("remote", null, "replicationDelay", TEST_REPLICATION_DELAY);
+    project.ifPresent(prj -> remoteConfig.setString("remote", null, "projects", prj));
+    remoteConfig.save();
+  }
+
+  private void setReplicationCredentials(String remoteName, String username, String password)
+      throws IOException {
+    secureConfig.setString("remote", remoteName, "username", username);
+    secureConfig.setString("remote", remoteName, "password", password);
+    secureConfig.save();
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private <T> T getInstance(Class<T> classObj) {
+    return plugin.getSysInjector().getInstance(classObj);
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
index 39c6717..2c888c8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -18,11 +18,12 @@
 import static org.apache.http.HttpStatus.SC_ACCEPTED;
 import static org.apache.http.HttpStatus.SC_CREATED;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.anyString;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
 import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
 import com.google.gerrit.extensions.restapi.BadRequestException;
 import com.google.gerrit.extensions.restapi.Response;
 import com.google.gerrit.extensions.restapi.RestApiException;
@@ -60,6 +61,7 @@
   @Mock DynamicItem<UrlFormatter> urlFormatterDynamicItem;
   @Mock UrlFormatter urlFormatter;
   @Mock WorkQueue.Task<Void> task;
+  @Mock FetchPreconditions preConditions;
 
   @Before
   public void setup() {
@@ -75,8 +77,9 @@
             });
     when(urlFormatterDynamicItem.get()).thenReturn(urlFormatter);
     when(task.getTaskId()).thenReturn(taskId);
+    when(preConditions.canCallFetchApi()).thenReturn(true);
 
-    fetchAction = new FetchAction(fetchCommand, workQueue, urlFormatterDynamicItem);
+    fetchAction = new FetchAction(fetchCommand, workQueue, urlFormatterDynamicItem, preConditions);
   }
 
   @Test
@@ -205,6 +208,18 @@
     fetchAction.apply(projectResource, inputParams);
   }
 
+  @Test(expected = AuthException.class)
+  public void shouldThrowAuthExceptionWhenCallFetchActionCapabilityNotAssigned()
+      throws RestApiException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.refName = refName;
+
+    when(preConditions.canCallFetchApi()).thenReturn(false);
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
   @Test
   public void shouldReturnScheduledTaskForAsyncCall() throws RestApiException {
     FetchAction.Input inputParams = new FetchAction.Input();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index 22dd0e9..669161f 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -34,7 +34,6 @@
 import org.apache.http.Header;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.message.BasicHeader;
 import org.eclipse.jgit.storage.file.FileBasedConfig;
 import org.eclipse.jgit.transport.CredentialItem;
@@ -56,7 +55,8 @@
 public class FetchRestApiClientTest {
   @Mock CredentialsProvider credentialProvider;
   @Mock CredentialsFactory credentials;
-  @Mock CloseableHttpClient httpClient;
+  @Mock HttpClient httpClient;
+  @Mock SourceHttpClient.Factory httpClientFactory;
   @Mock FileBasedConfig config;
   @Mock ReplicationFileBasedConfig replicationConfig;
   @Mock Source source;
@@ -96,8 +96,9 @@
 
     HttpResult httpResult = new HttpResult(SC_CREATED, Optional.of("result message"));
     when(httpClient.execute(any(HttpPost.class), any(), any())).thenReturn(httpResult);
-
-    objectUnderTest = new FetchRestApiClient(credentials, httpClient, replicationConfig, source);
+    when(httpClientFactory.create(any())).thenReturn(httpClient);
+    objectUnderTest =
+        new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source);
   }
 
   @Test