Merge branch 'stable-3.2'

* stable-3.2:
  Block Fetch Rest API call when instanceLabel is missing
  Execute refs fetch in batches
  Add native git fetch support
  Make sure replication task clean up is triggered
  Fix issue with replication when replicationDelay is set to zero

Change-Id: I6443e8cdf52225ec63842a15e5f15d49b2cf7364
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 43cd4ef..d4e22d9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -31,7 +31,9 @@
 import com.google.gerrit.server.util.IdGenerator;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
 import com.jcraft.jsch.JSchException;
 import java.io.IOException;
 import java.util.Collection;
@@ -47,15 +49,10 @@
 import org.eclipse.jgit.errors.RemoteRepositoryException;
 import org.eclipse.jgit.errors.RepositoryNotFoundException;
 import org.eclipse.jgit.errors.TransportException;
-import org.eclipse.jgit.lib.NullProgressMonitor;
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
-import org.eclipse.jgit.transport.CredentialsProvider;
-import org.eclipse.jgit.transport.FetchResult;
 import org.eclipse.jgit.transport.RefSpec;
 import org.eclipse.jgit.transport.RemoteConfig;
-import org.eclipse.jgit.transport.TrackingRefUpdate;
-import org.eclipse.jgit.transport.Transport;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.MDC;
 
@@ -77,7 +74,6 @@
   private final GitRepositoryManager gitManager;
   private final Source pool;
   private final RemoteConfig config;
-  private final CredentialsProvider credentialsProvider;
   private final PerThreadRequestScope.Scoper threadScoper;
 
   private final Project.NameKey projectName;
@@ -96,23 +92,23 @@
   private final long createdAt;
   private final FetchReplicationMetrics metrics;
   private final AtomicBoolean canceledWhileRunning;
+  private final FetchFactory fetchFactory;
 
   @Inject
   FetchOne(
       GitRepositoryManager grm,
       Source s,
-      RemoteConfig c,
-      CredentialsFactory cpFactory,
+      SourceConfiguration c,
       PerThreadRequestScope.Scoper ts,
       IdGenerator ig,
       ReplicationStateListeners sl,
       FetchReplicationMetrics m,
+      FetchFactory fetchFactory,
       @Assisted Project.NameKey d,
       @Assisted URIish u) {
     gitManager = grm;
     pool = s;
-    config = c;
-    credentialsProvider = cpFactory.create(c.getName());
+    config = c.getRemoteConfig();
     threadScoper = ts;
     projectName = d;
     uri = u;
@@ -123,6 +119,7 @@
     createdAt = System.nanoTime();
     metrics = m;
     canceledWhileRunning = new AtomicBoolean(false);
+    this.fetchFactory = fetchFactory;
     maxRetries = s.getMaxRetries();
   }
 
@@ -365,21 +362,9 @@
   }
 
   private void runImpl() throws IOException {
-    FetchResult res;
-    try (Transport tn = Transport.open(git, uri)) {
-      res = fetchVia(tn);
-    }
-    updateStates(res.getTrackingRefUpdates());
-  }
-
-  private FetchResult fetchVia(Transport tn) throws IOException {
-    tn.applyConfig(config);
-    tn.setCredentialsProvider(credentialsProvider);
-
+    Fetch fetch = fetchFactory.create(uri, git);
     List<RefSpec> fetchRefSpecs = getFetchRefSpecs();
-
-    repLog.info("Fetch references {} from {}", fetchRefSpecs, uri);
-    return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
+    updateStates(fetch.fetch(fetchRefSpecs));
   }
 
   private List<RefSpec> getFetchRefSpecs() {
@@ -389,12 +374,12 @@
     return delta.stream().map(ref -> new RefSpec(ref + ":" + ref)).collect(Collectors.toList());
   }
 
-  private void updateStates(Collection<TrackingRefUpdate> refUpdates) throws IOException {
+  private void updateStates(List<RefUpdateState> refUpdates) throws IOException {
     Set<String> doneRefs = new HashSet<>();
     boolean anyRefFailed = false;
     RefUpdate.Result lastRefUpdateResult = RefUpdate.Result.NO_CHANGE;
 
-    for (TrackingRefUpdate u : refUpdates) {
+    for (RefUpdateState u : refUpdates) {
       ReplicationState.RefFetchResult fetchStatus = ReplicationState.RefFetchResult.SUCCEEDED;
       Set<ReplicationState> logStates = new HashSet<>();
       lastRefUpdateResult = u.getResult();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index 3a4d4a7..565caee 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -24,9 +24,9 @@
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
 import com.google.common.flogger.FluentLogger;
-import com.google.gerrit.common.data.GroupReference;
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.GroupReference;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.entities.RefNames;
 import com.google.gerrit.exceptions.StorageException;
@@ -60,6 +60,13 @@
 import com.google.inject.servlet.RequestScoped;
 import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
 import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetchValidator;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchClientImplementation;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.JGitFetch;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -70,7 +77,6 @@
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -82,7 +88,6 @@
 import org.eclipse.jgit.lib.RefUpdate;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.transport.RefSpec;
-import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.slf4j.Logger;
 
@@ -95,7 +100,7 @@
   }
 
   private final ReplicationStateListener stateLog;
-  private final Map<Project.NameKey, Object> stateLock = new ConcurrentHashMap<>();
+  private final Object stateLock = new Object();
   private final Map<URIish, FetchOne> pending = new HashMap<>();
   private final Map<URIish, FetchOne> inFlight = new HashMap<>();
   private final FetchOne.Factory opFactory;
@@ -172,8 +177,15 @@
                 bind(PerThreadRequestScope.Propagator.class);
 
                 bind(Source.class).toInstance(Source.this);
-                bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
+                bind(SourceConfiguration.class).toInstance(config);
                 install(new FactoryModuleBuilder().build(FetchOne.Factory.class));
+                Class<? extends Fetch> clientClass =
+                    cfg.useCGitClient() ? CGitFetch.class : JGitFetch.class;
+                install(
+                    new FactoryModuleBuilder()
+                        .implement(Fetch.class, BatchFetchClient.class)
+                        .implement(Fetch.class, FetchClientImplementation.class, clientClass)
+                        .build(FetchFactory.class));
               }
 
               @Provides
@@ -194,7 +206,7 @@
                 };
               }
             });
-
+    child.getBinding(FetchFactory.class).acceptTargetVisitor(new CGitFetchValidator());
     opFactory = child.getInstance(FetchOne.Factory.class);
     threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
   }
@@ -363,7 +375,7 @@
 
     if (!config.replicatePermissions()) {
       FetchOne e;
-      synchronized (stateLock.getOrDefault(project, new Object())) {
+      synchronized (stateLock) {
         e = pending.get(uri);
       }
       if (e == null) {
@@ -386,15 +398,15 @@
       }
     }
 
-    synchronized (stateLock.getOrDefault(project, new Object())) {
+    synchronized (stateLock) {
       FetchOne e = pending.get(uri);
       Future<?> f = CompletableFuture.completedFuture(null);
       if (e == null) {
         e = opFactory.create(project, uri);
         addRef(e, ref);
         e.addState(ref, state);
-        f = pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
         pending.put(uri, e);
+        f = pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
         e.addState(ref, state);
@@ -406,7 +418,7 @@
   }
 
   void fetchWasCanceled(FetchOne fetchOp) {
-    synchronized (stateLock.getOrDefault(fetchOp.getProjectNameKey(), new Object())) {
+    synchronized (stateLock) {
       URIish uri = fetchOp.getURI();
       pending.remove(uri);
     }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
index 04fae31..e50461d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -23,7 +23,7 @@
 import org.eclipse.jgit.transport.RemoteConfig;
 
 public class SourceConfiguration implements RemoteConfiguration {
-  static final int DEFAULT_REPLICATION_DELAY = 15;
+  static final int DEFAULT_REPLICATION_DELAY = 0;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
   static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
   static final int DEFAULT_MAX_CONNECTION_INACTIVITY_MS = 10000;
@@ -50,6 +50,8 @@
   private final int maxConnections;
   private final int maxRetries;
   private int slowLatencyThreshold;
+  private boolean useCGitClient;
+  private int refsBatchSize;
 
   public SourceConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
@@ -74,6 +76,10 @@
 
     replicatePermissions = cfg.getBoolean("remote", name, "replicatePermissions", true);
     replicateHiddenProjects = cfg.getBoolean("remote", name, "replicateHiddenProjects", false);
+    useCGitClient = cfg.getBoolean("replication", "useCGitClient", false);
+    refsBatchSize = cfg.getInt("replication", "refsBatchSize", 50);
+    if (refsBatchSize <= 0)
+      throw new IllegalArgumentException("refsBatchSize must be greater than zero");
     remoteNameStyle =
         MoreObjects.firstNonNull(cfg.getString("remote", name, "remoteNameStyle"), "slash");
     maxRetries =
@@ -167,6 +173,14 @@
     return replicateHiddenProjects;
   }
 
+  public boolean useCGitClient() {
+    return useCGitClient;
+  }
+
+  public int getRefsBatchSize() {
+    return refsBatchSize;
+  }
+
   @Override
   public RemoteConfig getRemoteConfig() {
     return remoteConfig;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
index b3ab83d..7470c2c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -59,8 +59,8 @@
     }
 
     try {
-      Future<?> future = source.get().schedule(name, refName, state, true);
       state.markAllFetchTasksScheduled();
+      Future<?> future = source.get().schedule(name, refName, state, true);
       future.get(source.get().getTimeout(), TimeUnit.SECONDS);
     } catch (ExecutionException
         | IllegalStateException
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
index 502d3b7..bdfc1c3 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClient.java
@@ -14,6 +14,9 @@
 
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.Strings;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.extensions.restapi.Url;
@@ -64,7 +67,11 @@
     this.httpClientFactory = httpClientFactory;
     this.source = source;
     this.instanceLabel =
-        replicationConfig.getConfig().getString("replication", null, "instanceLabel");
+        Strings.nullToEmpty(
+                replicationConfig.getConfig().getString("replication", null, "instanceLabel"))
+            .trim();
+    requireNonNull(
+        Strings.emptyToNull(instanceLabel), "replication.instanceLabel cannot be null or empty");
   }
 
   public HttpResult callFetch(Project.NameKey project, String refName, URIish targetUri)
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
new file mode 100644
index 0000000..f2fbcd1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/BatchFetchClient.java
@@ -0,0 +1,49 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.URIish;
+
+public class BatchFetchClient implements Fetch {
+  private int batchSize;
+  private Fetch fetchClient;
+
+  @Inject
+  public BatchFetchClient(
+      SourceConfiguration config,
+      FetchFactory factory,
+      @Assisted URIish uri,
+      @Assisted Repository git) {
+    this.batchSize = config.getRefsBatchSize();
+    this.fetchClient = factory.createPlainImpl(uri, git);
+  }
+
+  @Override
+  public List<RefUpdateState> fetch(List<RefSpec> refs) throws IOException {
+    List<RefUpdateState> results = Lists.newArrayList();
+    for (List<RefSpec> refsBatch : Lists.partition(refs, batchSize)) {
+      results.addAll(fetchClient.fetch(refsBatch));
+    }
+    return results;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
new file mode 100644
index 0000000..9f055c8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetch.java
@@ -0,0 +1,93 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.URIish;
+
+public class CGitFetch implements Fetch {
+
+  private File localProjectDirectory;
+  private URIish uri;
+  private int timeout;
+
+  @Inject
+  public CGitFetch(SourceConfiguration config, @Assisted URIish uri, @Assisted Repository git) {
+
+    this.localProjectDirectory = git.getDirectory();
+    this.uri = uri;
+    this.timeout = config.getRemoteConfig().getTimeout();
+  }
+
+  @Override
+  public List<RefUpdateState> fetch(List<RefSpec> refsSpec) throws IOException {
+    List<String> refs = refsSpec.stream().map(s -> s.toString()).collect(Collectors.toList());
+    List<String> command = Lists.newArrayList("git", "fetch", uri.toASCIIString());
+    command.addAll(refs);
+    ProcessBuilder pb = new ProcessBuilder().command(command).directory(localProjectDirectory);
+    repLog.info("Fetch references {} from {}", refs, uri);
+    Process process = pb.start();
+
+    try {
+      boolean isFinished = waitForTaskToFinish(process);
+      if (!isFinished) {
+        throw new TransportException(
+            String.format("Timeout exception during the fetch from: %s, refs: %s", uri, refs));
+      }
+      if (process.exitValue() != 0) {
+        String errorMessage =
+            new BufferedReader(new InputStreamReader(process.getErrorStream()))
+                .lines()
+                .collect(Collectors.joining("\n"));
+        throw new TransportException(
+            String.format("Cannot fetch from %s, error message: %s}", uri, errorMessage));
+      }
+
+      return refsSpec.stream()
+          .map(
+              value -> {
+                return new RefUpdateState(value.getSource(), RefUpdate.Result.NEW);
+              })
+          .collect(Collectors.toList());
+    } catch (InterruptedException e) {
+      repLog.error("Thread interrupted during the fetch from: {}, refs: {}", uri, refs);
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public boolean waitForTaskToFinish(Process process) throws InterruptedException {
+    if (timeout == 0) {
+      process.waitFor();
+      return true;
+    }
+    return process.waitFor(timeout, TimeUnit.SECONDS);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetchValidator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetchValidator.java
new file mode 100644
index 0000000..9a10898
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/CGitFetchValidator.java
@@ -0,0 +1,79 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.inject.TypeLiteral;
+import com.google.inject.assistedinject.AssistedInjectBinding;
+import com.google.inject.assistedinject.AssistedInjectTargetVisitor;
+import com.google.inject.assistedinject.AssistedMethod;
+import com.google.inject.spi.DefaultBindingTargetVisitor;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class CGitFetchValidator extends DefaultBindingTargetVisitor<FetchFactory, Void>
+    implements AssistedInjectTargetVisitor<FetchFactory, Void> {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private int DEFAULT_TIMEOUT_IN_SECONDS = 10;
+
+  @Override
+  public Void visit(AssistedInjectBinding<? extends FetchFactory> binding) {
+    TypeLiteral<CGitFetch> nativeGitFetchType = new TypeLiteral<CGitFetch>() {};
+    for (AssistedMethod method : binding.getAssistedMethods()) {
+      if (method.getImplementationType().equals(nativeGitFetchType)) {
+        String[] command = new String[] {"git", "--version"};
+
+        ProcessBuilder pb = new ProcessBuilder().command(command);
+        try {
+          Process process = pb.start();
+
+          boolean isFinished = process.waitFor(DEFAULT_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
+          if (!isFinished) {
+            throw new IllegalStateException(
+                "Timeout while checking if native git client is available");
+          }
+          if (process.exitValue() != 0) {
+            String errorMessage = readMessage(process.getErrorStream());
+            throw new IllegalStateException(
+                String.format(
+                    "Cannot check if native git client is available, error message: %s}",
+                    errorMessage));
+          }
+
+          String commandOutputMessage = readMessage(process.getInputStream());
+          logger.atInfo().log("Native git client version: %s", commandOutputMessage);
+        } catch (IOException e) {
+          throw new IllegalStateException(
+              "Cannot start process to check if native git client is available", e);
+        } catch (InterruptedException e) {
+          throw new IllegalStateException(
+              "Timeout while checking if native git client is available");
+        }
+      }
+    }
+    return null;
+  }
+
+  private String readMessage(InputStream stream) {
+    return new BufferedReader(new InputStreamReader(stream))
+        .lines()
+        .collect(Collectors.joining("\n"));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/Fetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/Fetch.java
new file mode 100644
index 0000000..1e99a94
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/Fetch.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import java.io.IOException;
+import java.util.List;
+import org.eclipse.jgit.transport.RefSpec;
+
+@FunctionalInterface
+public interface Fetch {
+  public List<RefUpdateState> fetch(List<RefSpec> refs) throws IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java
new file mode 100644
index 0000000..77317b7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchClientImplementation.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@BindingAnnotation
+@Retention(RetentionPolicy.RUNTIME)
+public @interface FetchClientImplementation {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
new file mode 100644
index 0000000..d356477
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/FetchFactory.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.URIish;
+
+public interface FetchFactory {
+  Fetch create(URIish uri, Repository git);
+  // Return implementation without any decorators
+  @FetchClientImplementation
+  Fetch createPlainImpl(URIish uri, Repository git);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
new file mode 100644
index 0000000..5de82f1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/JGitFetch.java
@@ -0,0 +1,71 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.pull.SourceConfiguration;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.eclipse.jgit.lib.NullProgressMonitor;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.FetchResult;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+
+public class JGitFetch implements Fetch {
+  private final RemoteConfig config;
+  private final CredentialsProvider credentialsProvider;
+  URIish uri;
+  Repository git;
+
+  @Inject
+  public JGitFetch(
+      SourceConfiguration sourceConfig,
+      CredentialsFactory cpFactory,
+      @Assisted URIish uri,
+      @Assisted Repository git) {
+    this.config = sourceConfig.getRemoteConfig();
+    this.credentialsProvider = cpFactory.create(config.getName());
+    this.uri = uri;
+    this.git = git;
+  }
+
+  @Override
+  public List<RefUpdateState> fetch(List<RefSpec> refs) throws IOException {
+    FetchResult res;
+    try (Transport tn = Transport.open(git, uri)) {
+      res = fetchVia(tn, refs);
+    }
+    return res.getTrackingRefUpdates().stream()
+        .map(value -> new RefUpdateState(value.getRemoteName(), value.getResult()))
+        .collect(Collectors.toList());
+  }
+
+  private FetchResult fetchVia(Transport tn, List<RefSpec> fetchRefSpecs) throws IOException {
+    tn.applyConfig(config);
+    tn.setCredentialsProvider(credentialsProvider);
+
+    repLog.info("Fetch references {} from {}", fetchRefSpecs, uri);
+    return tn.fetch(NullProgressMonitor.INSTANCE, fetchRefSpecs);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/RefUpdateState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/RefUpdateState.java
new file mode 100644
index 0000000..d316960
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/fetch/RefUpdateState.java
@@ -0,0 +1,47 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.fetch;
+
+import org.eclipse.jgit.lib.RefUpdate;
+
+public class RefUpdateState {
+
+  private String remoteName;
+  private RefUpdate.Result result;
+
+  public RefUpdateState(String remoteName, RefUpdate.Result result) {
+    this.remoteName = remoteName;
+    this.result = result;
+  }
+
+  public String getRemoteName() {
+    return remoteName;
+  }
+
+  public RefUpdate.Result getResult() {
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("RefUpdateState[");
+    sb.append(remoteName);
+    sb.append(" ");
+    sb.append(result);
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index a6ea668..c5d13a0 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -14,7 +14,7 @@
 ```
 
 <a name="example_file">
-Next, create `$site_path/etc/@PLUGIN@.config` as a Git-style config
+Next, create `$site_path/etc/replication.config` as a Git-style config
 file, for example to replicate in parallel from four different hosts:</a>
 
 ```
@@ -47,7 +47,7 @@
 File `@PLUGIN@.config`
 -------------------------
 
-The optional file `$site_path/etc/@PLUGIN@.config` is a Git-style
+The optional file `$site_path/etc/replication.config` is a Git-style
 config file that controls the replication settings for the replication
 plugin.
 
@@ -131,6 +131,22 @@
 
 	Default: 2 * replication.maxConnectionsPerRoute
 
+replication.useCGitClient
+:	By default Gerrit uses JGit library to execute all git protocol command.
+	By setting this property to true all git fetch operation are going to be
+	executed using CGit client instead of JGit.
+
+	Default: false
+
+replication.refsBatchSize
+:	Number of refs that are fetched in a single fetch call.
+	If number of refs to fetch is greater then this param,
+	refs are going to be split into a separate fetch operations.
+
+	Value must be greater than zero.
+
+	Default: 50
+
 remote.NAME.url
 :	Address of the remote server to fetch from. Single URL can be
 	specified within a single remote block. A remote node can request
@@ -210,6 +226,15 @@
 
 	Defaults to 0 seconds, wait indefinitely.
 
+remote.NAME.replicationDelay
+:	Time to wait before scheduling a remote fetch operation. Setting
+	the delay to 0 effectively disables the delay, causing the fetch
+	to start as soon as possible.
+
+	This is a Gerrit specific extension to the Git remote block.
+
+	By default, 0 seconds.
+
 remote.NAME.rescheduleDelay
 :	Delay when rescheduling a fetch operation due to an in-flight fetch
 	running for the same project.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
new file mode 100644
index 0000000..a02f43f
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/CGitFetchIT.java
@@ -0,0 +1,285 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
+import com.google.gerrit.acceptance.PushOneCommit.Result;
+import com.google.gerrit.acceptance.SkipProjectClone;
+import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.UseLocalDisk;
+import com.google.gerrit.acceptance.testsuite.project.ProjectOperations;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.api.projects.BranchInput;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.BatchFetchClient;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.CGitFetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.Fetch;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchClientImplementation;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.FetchFactory;
+import com.googlesource.gerrit.plugins.replication.pull.fetch.RefUpdateState;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Supplier;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+@SkipProjectClone
+@UseLocalDisk
+@TestPlugin(
+    name = "pull-replication",
+    sysModule = "com.googlesource.gerrit.plugins.replication.pull.CGitFetchIT$TestModule")
+public class CGitFetchIT extends LightweightPluginDaemonTest {
+  private static final String TEST_REPLICATION_SUFFIX = "suffix1";
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private static final int TEST_REPLICATION_DELAY = 60;
+  private static final Duration TEST_TIMEOUT = Duration.ofSeconds(TEST_REPLICATION_DELAY * 2);
+
+  @Inject private SitePaths sitePaths;
+  @Inject private ProjectOperations projectOperations;
+  private FetchFactory fetchFactory;
+  private Path gitPath;
+  private Path testRepoPath;
+
+  @Override
+  public void setUpTestPlugin() throws Exception {
+    gitPath = sitePaths.site_path.resolve("git");
+    testRepoPath = gitPath.resolve(project + TEST_REPLICATION_SUFFIX + ".git");
+
+    super.setUpTestPlugin();
+    fetchFactory = plugin.getSysInjector().getInstance(FetchFactory.class);
+  }
+
+  @Test
+  public void shouldFetchRef() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    try (Repository repo = repoManager.openRepository(project)) {
+
+      Result pushResult = createChange();
+      RevCommit sourceCommit = pushResult.getCommit();
+      String sourceRef = pushResult.getPatchSet().refName();
+
+      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+
+      objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
+
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test(expected = TransportException.class)
+  public void shouldThrowExecptionWhenRefDoesNotExists() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+    String nonExistingRef = "refs/changes/02/20000/1:refs/changes/02/20000/1";
+    try (Repository repo = repoManager.openRepository(project)) {
+
+      createChange();
+
+      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+
+      objectUnderTest.fetch(Lists.newArrayList(new RefSpec(nonExistingRef)));
+    }
+  }
+
+  @Test(expected = TransportException.class)
+  public void shouldThrowExecptionWhenSourceDoesNotExists() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+    try (Repository repo = repoManager.openRepository(project)) {
+
+      Result pushResult = createChange();
+      String sourceRef = pushResult.getPatchSet().refName();
+
+      Fetch objectUnderTest = fetchFactory.create(new URIish("/not_existing_path/"), repo);
+
+      objectUnderTest.fetch(Lists.newArrayList(new RefSpec(sourceRef + ":" + sourceRef)));
+    }
+  }
+
+  @Test
+  public void shouldFetchMultipleRefs() throws Exception {
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    try (Repository repo = repoManager.openRepository(project)) {
+
+      Result pushResultOne = createChange();
+      String sourceRefOne = pushResultOne.getPatchSet().refName();
+      Result pushResultTwo = createChange();
+      String sourceRefTwo = pushResultTwo.getPatchSet().refName();
+
+      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+
+      objectUnderTest.fetch(
+          Lists.newArrayList(
+              new RefSpec(sourceRefOne + ":" + sourceRefOne),
+              new RefSpec(sourceRefTwo + ":" + sourceRefTwo)));
+
+      waitUntil(
+          () ->
+              checkedGetRef(repo, sourceRefOne) != null
+                  && checkedGetRef(repo, sourceRefTwo) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRefOne);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(pushResultOne.getCommit().getId());
+
+      targetBranchRef = getRef(repo, sourceRefTwo);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(pushResultTwo.getCommit().getId());
+    }
+  }
+
+  @Test
+  public void shouldFetchMultipleRefsInMultipleBatches() throws Exception {
+    Config cf = new Config();
+    cf.setInt("remote", "test_config", "timeout", 0);
+    cf.setInt("replication", null, "refsBatchSize", 2);
+    URIish uri = new URIish(testRepoPath.toString());
+    List<RefUpdateState> fetchResultList =
+        Lists.newArrayList(new RefUpdateState("test_config", RefUpdate.Result.NEW));
+    RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
+    SourceConfiguration sourceConfig = new SourceConfiguration(remoteConfig, cf);
+
+    Repository repo = mock(Repository.class);
+    FetchFactory fetchFactory = mock(FetchFactory.class);
+    Fetch fetchClient = mock(Fetch.class);
+    when(fetchFactory.createPlainImpl(uri, repo)).thenReturn(fetchClient);
+    when(fetchClient.fetch(any())).thenReturn(fetchResultList);
+
+    Fetch objectUnderTest =
+        new BatchFetchClient(sourceConfig, fetchFactory, new URIish(testRepoPath.toString()), repo);
+
+    objectUnderTest.fetch(
+        Lists.newArrayList(
+            new RefSpec("refs/changes/01/1/1:refs/changes/01/1/1"),
+            new RefSpec("refs/changes/02/2/1:refs/changes/02/2/1"),
+            new RefSpec("refs/changes/03/3/1:refs/changes/03/3/1")));
+    verify(fetchClient, times(2)).fetch(any());
+  }
+
+  @Test
+  public void shouldFetchNewBranch() throws Exception {
+    String testProjectName = project + TEST_REPLICATION_SUFFIX;
+    createTestProject(testProjectName);
+
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+
+      objectUnderTest.fetch(Lists.newArrayList(new RefSpec(newBranch + ":" + newBranch)));
+
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
+    }
+  }
+
+  @Test(expected = TransportException.class)
+  public void shouldThrowExceptionWhenBranchDoesNotExists() throws Exception {
+    String testProjectName = project + TEST_REPLICATION_SUFFIX;
+    createTestProject(testProjectName);
+
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      Fetch objectUnderTest = fetchFactory.create(new URIish(testRepoPath.toString()), repo);
+
+      objectUnderTest.fetch(
+          Lists.newArrayList(new RefSpec("non_existing_branch" + ":" + "non_existing_branch")));
+    }
+  }
+
+  private void waitUntil(Supplier<Boolean> waitCondition) throws InterruptedException {
+    WaitUtil.waitUntil(waitCondition, TEST_TIMEOUT);
+  }
+
+  private Ref getRef(Repository repo, String branchName) throws IOException {
+    return repo.getRefDatabase().exactRef(branchName);
+  }
+
+  private Ref checkedGetRef(Repository repo, String branchName) {
+    try {
+      return repo.getRefDatabase().exactRef(branchName);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("failed to get ref %s in repo %s", branchName, repo);
+      return null;
+    }
+  }
+
+  private Project.NameKey createTestProject(String name) throws Exception {
+    return projectOperations.newProject().name(name).create();
+  }
+
+  @SuppressWarnings("unused")
+  private static class TestModule extends FactoryModule {
+    @Override
+    protected void configure() {
+      Config cf = new Config();
+      cf.setInt("remote", "test_config", "timeout", 0);
+      try {
+        RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
+        SourceConfiguration sourceConfig = new SourceConfiguration(remoteConfig, cf);
+
+        bind(SourceConfiguration.class).toInstance(sourceConfig);
+        install(
+            new FactoryModuleBuilder()
+                .implement(Fetch.class, CGitFetch.class)
+                .implement(Fetch.class, FetchClientImplementation.class, CGitFetch.class)
+                .build(FetchFactory.class));
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
index c1c0981..3257fda 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
@@ -15,10 +15,13 @@
 package com.googlesource.gerrit.plugins.replication.pull;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static com.googlesource.gerrit.plugins.replication.pull.Source.encode;
 import static com.googlesource.gerrit.plugins.replication.pull.Source.needsUrlEncoding;
 
 import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
 import org.eclipse.jgit.transport.URIish;
 import org.junit.Test;
 
@@ -43,4 +46,14 @@
     assertThat(encode("name/with a space")).isEqualTo("name/with%20a%20space");
     assertThat(encode("name\nwith-LF")).isEqualTo("name%0Awith-LF");
   }
+
+  @Test
+  public void testRefsBatchSizeMustBeGreaterThanZero() throws URISyntaxException {
+    Config cf = new Config();
+    cf.setInt("remote", "test_config", "timeout", 0);
+    cf.setInt("replication", null, "refsBatchSize", 0);
+    RemoteConfig remoteConfig = new RemoteConfig(cf, "test_config");
+
+    assertThrows(IllegalArgumentException.class, () -> new SourceConfiguration(remoteConfig, cf));
+  }
 }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
index edea656..9557025 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationIT.java
@@ -29,6 +29,7 @@
 import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -145,6 +146,82 @@
     }
   }
 
+  @Test
+  public void shouldReplicateNewChangeRefCGitClient() throws Exception {
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+
+    config.setBoolean("replication", null, "useCGitClient", true);
+    config.save();
+
+    autoReloadConfigDecorator.reload();
+
+    testRepo = cloneProject(createTestProject(project + TEST_REPLICATION_SUFFIX));
+
+    Result pushResult = createChange();
+    RevCommit sourceCommit = pushResult.getCommit();
+    String sourceRef = pushResult.getPatchSet().refName();
+
+    ReplicationQueue pullReplicationQueue = getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            sourceRef,
+            ObjectId.zeroId().getName(),
+            sourceCommit.getId().getName(),
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, sourceRef) != null);
+
+      Ref targetBranchRef = getRef(repo, sourceRef);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId()).isEqualTo(sourceCommit.getId());
+    }
+  }
+
+  @Test
+  public void shouldReplicateNewBranchCGitClient() throws Exception {
+    AutoReloadConfigDecorator autoReloadConfigDecorator =
+        getInstance(AutoReloadConfigDecorator.class);
+
+    config.setBoolean("replication", null, "useCGitClient", true);
+    config.save();
+
+    autoReloadConfigDecorator.reload();
+
+    String testProjectName = project + TEST_REPLICATION_SUFFIX;
+    createTestProject(testProjectName);
+
+    String newBranch = "refs/heads/mybranch";
+    String master = "refs/heads/master";
+    BranchInput input = new BranchInput();
+    input.revision = master;
+    gApi.projects().name(testProjectName).branch(newBranch).create(input);
+    String branchRevision = gApi.projects().name(testProjectName).branch(newBranch).get().revision;
+
+    ReplicationQueue pullReplicationQueue =
+        plugin.getSysInjector().getInstance(ReplicationQueue.class);
+    GitReferenceUpdatedListener.Event event =
+        new FakeGitReferenceUpdatedEvent(
+            project,
+            newBranch,
+            ObjectId.zeroId().getName(),
+            branchRevision,
+            ReceiveCommand.Type.CREATE);
+    pullReplicationQueue.onGitReferenceUpdated(event);
+
+    try (Repository repo = repoManager.openRepository(project);
+        Repository sourceRepo = repoManager.openRepository(project)) {
+      waitUntil(() -> checkedGetRef(repo, newBranch) != null);
+
+      Ref targetBranchRef = getRef(repo, newBranch);
+      assertThat(targetBranchRef).isNotNull();
+      assertThat(targetBranchRef.getObjectId().getName()).isEqualTo(branchRevision);
+    }
+  }
+
   private Ref getRef(Repository repo, String branchName) throws IOException {
     return repo.getRefDatabase().exactRef(branchName);
   }
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
index 669161f..c62ddab 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/client/FetchRestApiClientTest.java
@@ -15,6 +15,7 @@
 package com.googlesource.gerrit.plugins.replication.pull.client;
 
 import static com.google.common.truth.Truth.assertThat;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
 import static javax.servlet.http.HttpServletResponse.SC_CREATED;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
@@ -140,6 +141,30 @@
         .isEqualTo(expectedHeader.getValue());
   }
 
+  @Test
+  public void shouldThrowExceptionWhenInstanceLabelIsNull() {
+    when(config.getString("replication", null, "instanceLabel")).thenReturn(null);
+    assertThrows(
+        NullPointerException.class,
+        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+  }
+
+  @Test
+  public void shouldTrimInstanceLabel() {
+    when(config.getString("replication", null, "instanceLabel")).thenReturn(" ");
+    assertThrows(
+        NullPointerException.class,
+        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+  }
+
+  @Test
+  public void shouldThrowExceptionWhenInstanceLabelIsEmpty() {
+    when(config.getString("replication", null, "instanceLabel")).thenReturn("");
+    assertThrows(
+        NullPointerException.class,
+        () -> new FetchRestApiClient(credentials, httpClientFactory, replicationConfig, source));
+  }
+
   public String readPayload(HttpPost entity) throws UnsupportedOperationException, IOException {
     ByteBuffer buf = IO.readWholeStream(entity.getEntity().getContent(), 1024);
     return RawParseUtils.decode(buf.array(), buf.arrayOffset(), buf.limit()).trim();