Initial commit

Change-Id: I9404351df2b4222699b7aa78e04ec23792f5a03b
diff --git a/BUILD b/BUILD
new file mode 100644
index 0000000..7fa956c
--- /dev/null
+++ b/BUILD
@@ -0,0 +1,30 @@
+load("//tools/bzl:junit.bzl", "junit_tests")
+load("//tools/bzl:plugin.bzl", "PLUGIN_DEPS", "PLUGIN_TEST_DEPS", "gerrit_plugin")
+
+gerrit_plugin(
+    name = "pull-replication",
+    srcs = glob(["src/main/java/**/*.java"]),
+    manifest_entries = [
+        "Implementation-Title: Pull Replication plugin",
+        "Implementation-URL: https://gerrit-review.googlesource.com/#/admin/projects/plugins/pull-replication",
+        "Gerrit-PluginName: pull-replication",
+        "Gerrit-Module: com.googlesource.gerrit.plugins.replication.pull.PullReplicationModule",
+        "Gerrit-SshModule: com.googlesource.gerrit.plugins.replication.pull.SshModule",
+    ],
+    resources = glob(["src/main/resources/**/*"]),
+    deps = [
+        "//lib/commons:io",
+        "//plugins/replication:replication",
+    ],
+)
+
+junit_tests(
+    name = "pull_replication_tests",
+    srcs = glob(["src/test/java/**/*Test.java"]),
+    tags = ["pull-replication"],
+    visibility = ["//visibility:public"],
+    deps = PLUGIN_TEST_DEPS + PLUGIN_DEPS + [
+        ":pull-replication__plugin",
+        "//plugins/replication:replication",
+    ],
+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..11069ed
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright [yyyy] [name of copyright owner]
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
new file mode 100644
index 0000000..baffff7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchAll.java
@@ -0,0 +1,101 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.common.Nullable;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.transport.URIish;
+
+public class FetchAll implements Runnable {
+  private final ReplicationStateListener stateLog;
+
+  public interface Factory {
+    FetchAll create(String urlMatch, ReplicationFilter filter, ReplicationState state, boolean now);
+  }
+
+  private final WorkQueue workQueue;
+  private final ProjectCache projectCache;
+  private final String urlMatch;
+  private final ReplicationFilter filter;
+  private final ReplicationState state;
+  private final boolean now;
+  private final SourcesCollection sources;
+
+  @Inject
+  protected FetchAll(
+      WorkQueue wq,
+      ProjectCache projectCache,
+      ReplicationStateListeners stateLog,
+      SourcesCollection sources,
+      @Assisted @Nullable String urlMatch,
+      @Assisted ReplicationFilter filter,
+      @Assisted ReplicationState state,
+      @Assisted boolean now) {
+    this.workQueue = wq;
+    this.projectCache = projectCache;
+    this.stateLog = stateLog;
+    this.sources = sources;
+    this.urlMatch = urlMatch;
+    this.filter = filter;
+    this.state = state;
+    this.now = now;
+  }
+
+  Future<?> schedule(long delay, TimeUnit unit) {
+    return workQueue.getDefaultQueue().schedule(this, delay, unit);
+  }
+
+  @Override
+  public void run() {
+    try {
+      for (Project.NameKey nameKey : projectCache.all()) {
+        if (filter.matches(nameKey)) {
+          scheduleFullSync(nameKey, urlMatch, state, now);
+        }
+      }
+    } catch (Exception e) {
+      stateLog.error("Cannot enumerate known projects", e, state);
+    }
+    state.markAllFetchTasksScheduled();
+  }
+
+  private void scheduleFullSync(
+      Project.NameKey project, String urlMatch, ReplicationState state, boolean now) {
+
+    for (Source cfg : sources.getAll()) {
+      if (cfg.wouldFetchProject(project)) {
+        for (URIish uri : cfg.getURIs(project, urlMatch)) {
+          cfg.schedule(project, FetchOne.ALL_REFS, uri, state, now);
+        }
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    String s = "Replicate All Projects";
+    if (urlMatch != null) {
+      s = s + " from " + urlMatch;
+    }
+    return s;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
new file mode 100644
index 0000000..7ee0da1
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -0,0 +1,470 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.git.ProjectRunnable;
+import com.google.gerrit.server.git.WorkQueue.CanceledWhileRunning;
+import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.util.IdGenerator;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.jcraft.jsch.JSchException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.eclipse.jgit.errors.NoRemoteRepositoryException;
+import org.eclipse.jgit.errors.NotSupportedException;
+import org.eclipse.jgit.errors.RemoteRepositoryException;
+import org.eclipse.jgit.errors.RepositoryNotFoundException;
+import org.eclipse.jgit.errors.TransportException;
+import org.eclipse.jgit.lib.NullProgressMonitor;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.FetchResult;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.TrackingRefUpdate;
+import org.eclipse.jgit.transport.Transport;
+import org.eclipse.jgit.transport.URIish;
+import org.slf4j.MDC;
+
+/**
+ * A pull from remote operation started by command-line.
+ *
+ * <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
+ * ensure they are working with a current view of the object.
+ */
+class FetchOne implements ProjectRunnable, CanceledWhileRunning {
+  private final ReplicationStateListener stateLog;
+  static final String ALL_REFS = "..all..";
+  static final String ID_MDC_KEY = "fetchOneId";
+
+  interface Factory {
+    FetchOne create(Project.NameKey d, URIish u);
+  }
+
+  private final GitRepositoryManager gitManager;
+  private final Source pool;
+  private final RemoteConfig config;
+  private final CredentialsProvider credentialsProvider;
+  private final PerThreadRequestScope.Scoper threadScoper;
+
+  private final Project.NameKey projectName;
+  private final URIish uri;
+  private final Set<String> delta = Sets.newHashSetWithExpectedSize(4);
+  private boolean fetchAllRefs;
+  private Repository git;
+  private boolean retrying;
+  private int retryCount;
+  private final int maxRetries;
+  private boolean canceled;
+  private final ListMultimap<String, ReplicationState> stateMap = LinkedListMultimap.create();
+  private final int maxLockRetries;
+  private int lockRetryCount;
+  private final int id;
+  private final long createdAt;
+  private final FetchReplicationMetrics metrics;
+  private final AtomicBoolean canceledWhileRunning;
+
+  @Inject
+  FetchOne(
+      GitRepositoryManager grm,
+      Source s,
+      RemoteConfig c,
+      CredentialsFactory cpFactory,
+      PerThreadRequestScope.Scoper ts,
+      IdGenerator ig,
+      ReplicationStateListeners sl,
+      FetchReplicationMetrics m,
+      @Assisted Project.NameKey d,
+      @Assisted URIish u) {
+    gitManager = grm;
+    pool = s;
+    config = c;
+    credentialsProvider = cpFactory.create(c.getName());
+    threadScoper = ts;
+    projectName = d;
+    uri = u;
+    lockRetryCount = 0;
+    maxLockRetries = pool.getLockErrorMaxRetries();
+    id = ig.next();
+    stateLog = sl;
+    createdAt = System.nanoTime();
+    metrics = m;
+    canceledWhileRunning = new AtomicBoolean(false);
+    maxRetries = s.getMaxRetries();
+  }
+
+  @Override
+  public void cancel() {
+    repLog.info("Replication {} was canceled", getURI());
+    canceledByReplication();
+    pool.fetchWasCanceled(this);
+  }
+
+  @Override
+  public void setCanceledWhileRunning() {
+    repLog.info("Replication {} was canceled while being executed", getURI());
+    canceledWhileRunning.set(true);
+  }
+
+  @Override
+  public Project.NameKey getProjectNameKey() {
+    return projectName;
+  }
+
+  @Override
+  public String getRemoteName() {
+    return config.getName();
+  }
+
+  @Override
+  public boolean hasCustomizedPrint() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    String print = "[" + HexFormat.fromInt(id) + "] fetch " + uri;
+
+    if (retryCount > 0) {
+      print = "(retry " + retryCount + ") " + print;
+    }
+    return print;
+  }
+
+  boolean isRetrying() {
+    return retrying;
+  }
+
+  boolean setToRetry() {
+    retrying = true;
+    retryCount++;
+    return maxRetries == 0 || retryCount <= maxRetries;
+  }
+
+  void canceledByReplication() {
+    canceled = true;
+  }
+
+  boolean wasCanceled() {
+    return canceled;
+  }
+
+  URIish getURI() {
+    return uri;
+  }
+
+  void addRef(String ref) {
+    if (ALL_REFS.equals(ref)) {
+      delta.clear();
+      fetchAllRefs = true;
+      repLog.trace("Added all refs for replication from {}", uri);
+    } else if (!fetchAllRefs) {
+      delta.add(ref);
+      repLog.trace("Added ref {} for replication from {}", ref, uri);
+    }
+  }
+
+  Set<String> getRefs() {
+    return fetchAllRefs ? Sets.newHashSet(ALL_REFS) : delta;
+  }
+
+  void addRefs(Set<String> refs) {
+    if (!fetchAllRefs) {
+      for (String ref : refs) {
+        addRef(ref);
+      }
+    }
+  }
+
+  void addState(String ref, ReplicationState state) {
+    stateMap.put(ref, state);
+  }
+
+  ListMultimap<String, ReplicationState> getStates() {
+    return stateMap;
+  }
+
+  ReplicationState[] getStatesAsArray() {
+    Set<ReplicationState> statesSet = new HashSet<>();
+    statesSet.addAll(stateMap.values());
+    return statesSet.toArray(new ReplicationState[statesSet.size()]);
+  }
+
+  ReplicationState[] getStatesByRef(String ref) {
+    Collection<ReplicationState> states = stateMap.get(ref);
+    return states.toArray(new ReplicationState[states.size()]);
+  }
+
+  void addStates(ListMultimap<String, ReplicationState> states) {
+    stateMap.putAll(states);
+  }
+
+  void removeStates() {
+    stateMap.clear();
+  }
+
+  private void statesCleanUp() {
+    if (!stateMap.isEmpty() && !isRetrying()) {
+      for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+        entry
+            .getValue()
+            .notifyRefReplicated(
+                projectName.get(),
+                entry.getKey(),
+                uri,
+                ReplicationState.RefFetchResult.FAILED,
+                null);
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      threadScoper
+          .scope(
+              new Callable<Void>() {
+                @Override
+                public Void call() {
+                  runFetchOperation();
+                  return null;
+                }
+              })
+          .call();
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    } finally {
+      statesCleanUp();
+    }
+  }
+
+  private void runFetchOperation() {
+    // Lock the queue, and remove ourselves, so we can't be modified once
+    // we start replication (instead a new instance, with the same URI, is
+    // created and scheduled for a future point in time.)
+    //
+    MDC.put(ID_MDC_KEY, HexFormat.fromInt(id));
+    if (!pool.requestRunway(this)) {
+      if (!canceled) {
+        repLog.info(
+            "Rescheduling replication to {} to avoid collision with an in-flight fetch.", uri);
+        pool.reschedule(this, Source.RetryReason.COLLISION);
+      }
+      return;
+    }
+
+    repLog.info("Replication from {} started...", uri);
+    Timer1.Context<String> context = metrics.start(config.getName());
+    try {
+      long startedAt = context.getStartTime();
+      long delay = NANOSECONDS.toMillis(startedAt - createdAt);
+      metrics.record(config.getName(), delay, retryCount);
+      git = gitManager.openRepository(projectName);
+      runImpl();
+      long elapsed = NANOSECONDS.toMillis(context.stop());
+      repLog.info(
+          "Replication from {} completed in {}ms, {}ms delay, {} retries",
+          uri,
+          elapsed,
+          delay,
+          retryCount);
+    } catch (RepositoryNotFoundException e) {
+      stateLog.error(
+          "Cannot replicate " + projectName + "; Local repository error: " + e.getMessage(),
+          getStatesAsArray());
+
+    } catch (NoRemoteRepositoryException | RemoteRepositoryException e) {
+      // Tried to replicate to a remote via anonymous git:// but the repository
+      // does not exist.  In this case NoRemoteRepositoryException is not
+      // raised.
+      String msg = e.getMessage();
+      repLog.error("Cannot replicate {}; Remote repository error: {}", projectName, msg);
+    } catch (NotSupportedException e) {
+      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+    } catch (TransportException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
+        repLog.error("Cannot replicate from {}: {}", uri, cause.getMessage());
+      } else if (e instanceof LockFailureException) {
+        lockRetryCount++;
+        // The LockFailureException message contains both URI and reason
+        // for this failure.
+        repLog.error("Cannot replicate from {}: {}", uri, e.getMessage());
+
+        // The remote fetch operation should be retried.
+        if (lockRetryCount <= maxLockRetries) {
+          if (canceledWhileRunning.get()) {
+            logCanceledWhileRunningException(e);
+          } else {
+            pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+          }
+        } else {
+          repLog.error(
+              "Giving up after {} occurrences of this error: {} during replication from {}",
+              lockRetryCount,
+              e.getMessage(),
+              uri);
+        }
+      } else {
+        if (canceledWhileRunning.get()) {
+          logCanceledWhileRunningException(e);
+        } else {
+          repLog.error("Cannot replicate from {}", uri, e);
+          // The remote fetch operation should be retried.
+          pool.reschedule(this, Source.RetryReason.TRANSPORT_ERROR);
+        }
+      }
+    } catch (IOException e) {
+      stateLog.error("Cannot replicate from " + uri, e, getStatesAsArray());
+    } catch (RuntimeException | Error e) {
+      stateLog.error("Unexpected error during replication from " + uri, e, getStatesAsArray());
+    } finally {
+      if (git != null) {
+        git.close();
+      }
+      pool.notifyFinished(this);
+    }
+  }
+
+  private void logCanceledWhileRunningException(TransportException e) {
+    repLog.info("Cannot replicate from {}. It was canceled while running", uri, e);
+  }
+
+  private void runImpl() throws IOException {
+    FetchResult res;
+    try (Transport tn = Transport.open(git, uri)) {
+      res = fetchVia(tn);
+    }
+    updateStates(res.getTrackingRefUpdates());
+  }
+
+  private FetchResult fetchVia(Transport tn) throws IOException {
+    tn.applyConfig(config);
+    tn.setCredentialsProvider(credentialsProvider);
+
+    repLog.info("Fetch references {} from {}", config.getFetchRefSpecs(), uri);
+
+    return tn.fetch(NullProgressMonitor.INSTANCE, config.getFetchRefSpecs());
+  }
+
+  private void updateStates(Collection<TrackingRefUpdate> refUpdates) throws IOException {
+    Set<String> doneRefs = new HashSet<>();
+    boolean anyRefFailed = false;
+    RefUpdate.Result lastRefUpdateResult = RefUpdate.Result.NO_CHANGE;
+
+    for (TrackingRefUpdate u : refUpdates) {
+      ReplicationState.RefFetchResult fetchStatus = ReplicationState.RefFetchResult.SUCCEEDED;
+      Set<ReplicationState> logStates = new HashSet<>();
+      lastRefUpdateResult = u.getResult();
+
+      logStates.addAll(stateMap.get(u.getRemoteName()));
+      logStates.addAll(stateMap.get(ALL_REFS));
+      ReplicationState[] logStatesArray = logStates.toArray(new ReplicationState[logStates.size()]);
+
+      doneRefs.add(u.getRemoteName());
+      switch (u.getResult()) {
+        case NO_CHANGE:
+        case NEW:
+        case FORCED:
+        case RENAMED:
+        case FAST_FORWARD:
+          break;
+        case NOT_ATTEMPTED:
+        case REJECTED:
+        case REJECTED_CURRENT_BRANCH:
+        case REJECTED_MISSING_OBJECT:
+          stateLog.error(
+              String.format(
+                  "Failed replicate %s from %s: result %s", uri, u.getRemoteName(), u.getResult()),
+              logStatesArray);
+          fetchStatus = ReplicationState.RefFetchResult.FAILED;
+          anyRefFailed = true;
+          break;
+
+        case LOCK_FAILURE:
+          throw new LockFailureException(uri, u.toString());
+        case IO_FAILURE:
+          throw new IOException(u.toString());
+
+        case REJECTED_OTHER_REASON:
+          stateLog.error(
+              String.format(
+                  "Failed replicate %s from %s, reason: %s", uri, u.getRemoteName(), u.toString()),
+              logStatesArray);
+
+          fetchStatus = ReplicationState.RefFetchResult.FAILED;
+          anyRefFailed = true;
+          break;
+      }
+
+      for (ReplicationState rs : getStatesByRef(u.getRemoteName())) {
+        rs.notifyRefReplicated(
+            projectName.get(), u.getRemoteName(), uri, fetchStatus, u.getResult());
+      }
+    }
+
+    doneRefs.add(ALL_REFS);
+    for (ReplicationState rs : getStatesByRef(ALL_REFS)) {
+      rs.notifyRefReplicated(
+          projectName.get(),
+          ALL_REFS,
+          uri,
+          anyRefFailed
+              ? ReplicationState.RefFetchResult.FAILED
+              : ReplicationState.RefFetchResult.SUCCEEDED,
+          lastRefUpdateResult);
+    }
+    for (Map.Entry<String, ReplicationState> entry : stateMap.entries()) {
+      if (!doneRefs.contains(entry.getKey())) {
+        entry
+            .getValue()
+            .notifyRefReplicated(
+                projectName.get(),
+                entry.getKey(),
+                uri,
+                ReplicationState.RefFetchResult.NOT_ATTEMPTED,
+                null);
+      }
+    }
+    stateMap.clear();
+  }
+
+  public static class LockFailureException extends TransportException {
+    private static final long serialVersionUID = 1L;
+
+    LockFailureException(URIish uri, String message) {
+      super(uri, message);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
new file mode 100644
index 0000000..eb825b8
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicatedEvent.java
@@ -0,0 +1,80 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
+import org.eclipse.jgit.lib.RefUpdate;
+
+public class FetchRefReplicatedEvent extends RefEvent {
+  static final String TYPE = "fetch-ref-replicated";
+
+  final String project;
+  final String ref;
+  final String sourceNode;
+  final String status;
+  final RefUpdate.Result refUpdateResult;
+
+  public FetchRefReplicatedEvent(
+      String project,
+      String ref,
+      String sourceNode,
+      ReplicationState.RefFetchResult status,
+      RefUpdate.Result refUpdateResult) {
+    super(TYPE);
+    this.project = project;
+    this.ref = ref;
+    this.sourceNode = sourceNode;
+    this.status = status.toString();
+    this.refUpdateResult = refUpdateResult;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(project, ref, status, refUpdateResult);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof FetchRefReplicatedEvent)) {
+      return false;
+    }
+    FetchRefReplicatedEvent event = (FetchRefReplicatedEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    if (!Objects.equals(event.sourceNode, this.sourceNode)) {
+      return false;
+    }
+    if (!Objects.equals(event.status, this.status)) {
+      return false;
+    }
+    return Objects.equals(event.refUpdateResult, this.refUpdateResult);
+  }
+
+  @Override
+  public Project.NameKey getProjectNameKey() {
+    return Project.nameKey(project);
+  }
+
+  @Override
+  public String getRefName() {
+    return ref;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java
new file mode 100644
index 0000000..a434702
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchRefReplicationDoneEvent.java
@@ -0,0 +1,65 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+import java.util.Objects;
+
+public class FetchRefReplicationDoneEvent extends RefEvent {
+  static final String TYPE = "fetch-ref-replication-done";
+
+  final String project;
+  final String ref;
+  final int nodesCount;
+
+  public FetchRefReplicationDoneEvent(String project, String ref, int nodesCount) {
+    super(TYPE);
+    this.project = project;
+    this.ref = ref;
+    this.nodesCount = nodesCount;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(TYPE, project, ref, nodesCount);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof FetchRefReplicationDoneEvent)) {
+      return false;
+    }
+
+    FetchRefReplicationDoneEvent event = (FetchRefReplicationDoneEvent) other;
+    if (!Objects.equals(event.project, this.project)) {
+      return false;
+    }
+    if (!Objects.equals(event.ref, this.ref)) {
+      return false;
+    }
+    return event.nodesCount == this.nodesCount;
+  }
+
+  @Override
+  public Project.NameKey getProjectNameKey() {
+    return Project.nameKey(project);
+  }
+
+  @Override
+  public String getRefName() {
+    return ref;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
new file mode 100644
index 0000000..e952252
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationMetrics.java
@@ -0,0 +1,90 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.PluginName;
+import com.google.gerrit.metrics.Description;
+import com.google.gerrit.metrics.Field;
+import com.google.gerrit.metrics.Histogram1;
+import com.google.gerrit.metrics.MetricMaker;
+import com.google.gerrit.metrics.Timer1;
+import com.google.gerrit.server.logging.PluginMetadata;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class FetchReplicationMetrics {
+  private final Timer1<String> executionTime;
+  private final Histogram1<String> executionDelay;
+  private final Histogram1<String> executionRetries;
+
+  @Inject
+  FetchReplicationMetrics(@PluginName String pluginName, MetricMaker metricMaker) {
+    Field<String> SOURCE_FIELD =
+        Field.ofString(
+                "source",
+                (metadataBuilder, fieldValue) ->
+                    metadataBuilder
+                        .pluginName(pluginName)
+                        .addPluginMetadata(PluginMetadata.create("source", fieldValue)))
+            .build();
+
+    executionTime =
+        metricMaker.newTimer(
+            "replication_latency",
+            new Description("Time spent fetching from remote source.")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SOURCE_FIELD);
+
+    executionDelay =
+        metricMaker.newHistogram(
+            "replication_delay",
+            new Description("Time spent waiting before fetching from remote source")
+                .setCumulative()
+                .setUnit(Description.Units.MILLISECONDS),
+            SOURCE_FIELD);
+
+    executionRetries =
+        metricMaker.newHistogram(
+            "replication_retries",
+            new Description("Number of retries when fetching from remote sources")
+                .setCumulative()
+                .setUnit("retries"),
+            SOURCE_FIELD);
+  }
+
+  /**
+   * Start the replication latency timer from a source.
+   *
+   * @param name the source name.
+   * @return the timer context.
+   */
+  public Timer1.Context<String> start(String name) {
+    return executionTime.start(name);
+  }
+
+  /**
+   * Record the replication delay and retry metrics for a source.
+   *
+   * @param name the source name.
+   * @param delay replication delay in milliseconds.
+   * @param retries number of retries.
+   */
+  public void record(String name, long delay, long retries) {
+    executionDelay.record(name, delay);
+    executionRetries.record(name, retries);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
new file mode 100644
index 0000000..c3d6aca
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationScheduledEvent.java
@@ -0,0 +1,43 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.server.events.RefEvent;
+
+public class FetchReplicationScheduledEvent extends RefEvent {
+  static final String TYPE = "fetch-ref-replication-scheduled";
+
+  final String project;
+  final String ref;
+  final String sourceNode;
+
+  public FetchReplicationScheduledEvent(String project, String ref, String sourceNode) {
+    super(TYPE);
+    this.project = project;
+    this.ref = ref;
+    this.sourceNode = sourceNode;
+  }
+
+  @Override
+  public String getRefName() {
+    return ref;
+  }
+
+  @Override
+  public Project.NameKey getProjectNameKey() {
+    return Project.nameKey(project);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
new file mode 100644
index 0000000..202d7e6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -0,0 +1,194 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.events.RefEvent;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+
+public abstract class FetchResultProcessing {
+
+  abstract void onOneProjectReplicationDone(
+      String project,
+      String ref,
+      URIish uri,
+      ReplicationState.RefFetchResult status,
+      RefUpdate.Result refUpdateResult);
+
+  abstract void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount);
+
+  abstract void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount);
+
+  /**
+   * Write message to standard out.
+   *
+   * @param message message text.
+   */
+  void writeStdOut(String message) {
+    // Default doing nothing
+  }
+
+  /**
+   * Write message to standard error.
+   *
+   * @param message message text.
+   */
+  void writeStdErr(String message) {
+    // Default doing nothing
+  }
+
+  public static String resolveNodeName(URIish uri) {
+    StringBuilder sb = new StringBuilder();
+    if (uri.isRemote()) {
+      sb.append(uri.getHost());
+      if (uri.getPort() != -1) {
+        sb.append(":");
+        sb.append(uri.getPort());
+      }
+    } else {
+      sb.append(uri.getPath());
+    }
+    return sb.toString();
+  }
+
+  public static class CommandProcessing extends FetchResultProcessing {
+    private WeakReference<StartFetchCommand> sshCommand;
+    private AtomicBoolean hasError = new AtomicBoolean();
+
+    CommandProcessing(StartFetchCommand sshCommand) {
+      this.sshCommand = new WeakReference<>(sshCommand);
+    }
+
+    @Override
+    void onOneProjectReplicationDone(
+        String project,
+        String ref,
+        URIish uri,
+        ReplicationState.RefFetchResult status,
+        RefUpdate.Result refUpdateResult) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Replicate ");
+      sb.append(project);
+      sb.append(" ref ");
+      sb.append(ref);
+      sb.append(" from ");
+      sb.append(resolveNodeName(uri));
+      sb.append(", ");
+      switch (status) {
+        case SUCCEEDED:
+          sb.append("Succeeded!");
+          break;
+        case FAILED:
+          sb.append("FAILED!");
+          hasError.compareAndSet(false, true);
+          break;
+        case NOT_ATTEMPTED:
+          sb.append("NOT ATTEMPTED!");
+          break;
+        default:
+          sb.append("UNKNOWN RESULT!");
+          break;
+      }
+      sb.append(" (");
+      sb.append(refUpdateResult.toString());
+      sb.append(")");
+      writeStdOut(sb.toString());
+    }
+
+    @Override
+    void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Replication of ");
+      sb.append(project);
+      sb.append(" ref ");
+      sb.append(ref);
+      sb.append(" completed from ");
+      sb.append(nodesCount);
+      sb.append(" nodes, ");
+      writeStdOut(sb.toString());
+    }
+
+    @Override
+    void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount) {
+      if (totalFetchTasksCount == 0) {
+        return;
+      }
+      writeStdOut("----------------------------------------------");
+      if (hasError.get()) {
+        writeStdOut("Replication completed with some errors!");
+      } else {
+        writeStdOut("Replication completed successfully!");
+      }
+    }
+
+    @Override
+    void writeStdOut(String message) {
+      StartFetchCommand command = sshCommand.get();
+      if (command != null) {
+        command.writeStdOutSync(message);
+      }
+    }
+
+    @Override
+    void writeStdErr(String message) {
+      StartFetchCommand command = sshCommand.get();
+      if (command != null) {
+        command.writeStdErrSync(message);
+      }
+    }
+  }
+
+  public static class GitUpdateProcessing extends FetchResultProcessing {
+    private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+    private final EventDispatcher dispatcher;
+
+    public GitUpdateProcessing(EventDispatcher dispatcher) {
+      this.dispatcher = dispatcher;
+    }
+
+    @Override
+    void onOneProjectReplicationDone(
+        String project,
+        String ref,
+        URIish uri,
+        ReplicationState.RefFetchResult result,
+        RefUpdate.Result refUpdateResult) {
+      postEvent(
+          new FetchRefReplicatedEvent(project, ref, resolveNodeName(uri), result, refUpdateResult));
+    }
+
+    @Override
+    void onRefReplicatedFromAllNodes(String project, String ref, int nodesCount) {
+      postEvent(new FetchRefReplicationDoneEvent(project, ref, nodesCount));
+    }
+
+    @Override
+    void onAllRefsReplicatedFromAllNodes(int totalFetchTasksCount) {}
+
+    private void postEvent(RefEvent event) {
+      try {
+        dispatcher.postEvent(event);
+      } catch (PermissionBackendException e) {
+        logger.atSevere().withCause(e).log("Cannot post event");
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java
new file mode 100644
index 0000000..d7d55ec
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ListCommand.java
@@ -0,0 +1,126 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.base.Strings;
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.SshCommand;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.inject.Inject;
+import java.util.Collection;
+import java.util.List;
+import org.kohsuke.args4j.Option;
+
+@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
+@CommandMetaData(name = "list", description = "List remote source information")
+final class ListCommand extends SshCommand {
+  @Option(name = "--remote", metaVar = "PATTERN", usage = "pattern to match remote name on")
+  private String remote;
+
+  @Option(name = "--detail", usage = "output detailed information")
+  private boolean detail;
+
+  @Option(name = "--json", usage = "output in json format")
+  private boolean json;
+
+  @Inject private SourcesCollection sourcesCollection;
+
+  @Override
+  protected void run() {
+    for (Source s : sourcesCollection.getAll()) {
+      if (matches(s.getRemoteConfigName())) {
+        printRemote(s);
+      }
+    }
+  }
+
+  private boolean matches(String name) {
+    return (Strings.isNullOrEmpty(remote) || name.contains(remote) || name.matches(remote));
+  }
+
+  private void addProperty(JsonObject obj, String key, List<String> values) {
+    if (!values.isEmpty()) {
+      JsonArray list = new JsonArray();
+      for (String v : values) {
+        list.add(new JsonPrimitive(v));
+      }
+      obj.add(key, list);
+    }
+  }
+
+  private void addQueueDetails(StringBuilder out, Collection<FetchOne> values) {
+    for (FetchOne f : values) {
+      out.append("  ").append(f.toString()).append("\n");
+    }
+  }
+
+  private void addQueueDetails(JsonObject obj, String key, Collection<FetchOne> values) {
+    if (values.size() > 0) {
+      JsonArray list = new JsonArray();
+      for (FetchOne f : values) {
+        list.add(new JsonPrimitive(f.toString()));
+      }
+      obj.add(key, list);
+    }
+  }
+
+  private void printRemote(Source s) {
+    if (json) {
+      JsonObject obj = new JsonObject();
+      obj.addProperty("Remote", s.getRemoteConfigName());
+      addProperty(obj, "Url", s.getUrls());
+      if (detail) {
+        addProperty(obj, "AdminUrl", s.getAdminUrls());
+        addProperty(obj, "AuthGroup", s.getAuthGroupNames());
+        addProperty(obj, "Project", s.getProjects());
+        Source.QueueInfo q = s.getQueueInfo();
+        addQueueDetails(obj, "InFlight", q.inFlight.values());
+        addQueueDetails(obj, "Pending", q.pending.values());
+      }
+      stdout.print(obj.toString() + "\n");
+    } else {
+      StringBuilder out = new StringBuilder();
+      out.append("Remote: ").append(s.getRemoteConfigName()).append("\n");
+      for (String url : s.getUrls()) {
+        out.append("Url: ").append(url).append("\n");
+      }
+
+      if (detail) {
+        for (String adminUrl : s.getAdminUrls()) {
+          out.append("AdminUrl: ").append(adminUrl).append("\n");
+        }
+
+        for (String authGroup : s.getAuthGroupNames()) {
+          out.append("AuthGroup: ").append(authGroup).append("\n");
+        }
+
+        for (String project : s.getProjects()) {
+          out.append("Project: ").append(project).append("\n");
+        }
+
+        Source.QueueInfo q = s.getQueueInfo();
+        out.append("In Flight: ").append(q.inFlight.size()).append("\n");
+        addQueueDetails(out, q.inFlight.values());
+        out.append("Pending: ").append(q.pending.size()).append("\n");
+        addQueueDetails(out, q.pending.values());
+      }
+      stdout.print(out.toString() + "\n");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java
new file mode 100644
index 0000000..0e752c7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/NoopObservableQueue.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.googlesource.gerrit.plugins.replication.ObservableQueue;
+
+public class NoopObservableQueue implements ObservableQueue {
+
+	@Override
+	public boolean isRunning() {
+		return true;
+	}
+
+	@Override
+	public boolean isReplaying() {
+		return false;
+	}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
new file mode 100644
index 0000000..d8c4a8d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/OnStartStop.java
@@ -0,0 +1,82 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.util.concurrent.Atomics;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class OnStartStop implements LifecycleListener {
+  private final AtomicReference<Future<?>> fetchAllFuture;
+  private final ServerInformation srvInfo;
+  private final FetchAll.Factory fetchAll;
+  private final ReplicationConfig config;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
+  private final ReplicationState.Factory replicationStateFactory;
+  private final SourcesCollection sourcesCollection;
+  private final WorkQueue workQueue;
+
+  @Inject
+  protected OnStartStop(
+      ServerInformation srvInfo,
+      FetchAll.Factory fetchAll,
+      ReplicationConfig config,
+      DynamicItem<EventDispatcher> eventDispatcher,
+      ReplicationState.Factory replicationStateFactory,
+      SourcesCollection sourcesCollection,
+      WorkQueue workQueue) {
+    this.srvInfo = srvInfo;
+    this.fetchAll = fetchAll;
+    this.config = config;
+    this.eventDispatcher = eventDispatcher;
+    this.replicationStateFactory = replicationStateFactory;
+    this.fetchAllFuture = Atomics.newReference();
+    this.sourcesCollection = sourcesCollection;
+    this.workQueue = workQueue;
+  }
+
+  @Override
+  public void start() {
+    if (srvInfo.getState() == ServerInformation.State.STARTUP
+        && config.isReplicateAllOnPluginStart()) {
+      ReplicationState state =
+          replicationStateFactory.create(
+              new FetchResultProcessing.GitUpdateProcessing(eventDispatcher.get()));
+      fetchAllFuture.set(
+          fetchAll
+              .create(null, ReplicationFilter.all(), state, false)
+              .schedule(30, TimeUnit.SECONDS));
+    }
+
+    sourcesCollection.startup(workQueue);
+  }
+
+  @Override
+  public void stop() {
+    Future<?> f = fetchAllFuture.getAndSet(null);
+    if (f != null) {
+      f.cancel(true);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java
new file mode 100644
index 0000000..299648d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogFile.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2014 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import org.apache.log4j.PatternLayout;
+
+public class PullReplicationLogFile extends PluginLogFile {
+
+  @Inject
+  public PullReplicationLogFile(SystemLog systemLog, ServerInformation serverInfo) {
+    super(
+        systemLog,
+        serverInfo,
+        PullReplicationLogger.PULL_REPLICATION_LOG_NAME,
+        new PatternLayout("[%d] [%X{" + FetchOne.ID_MDC_KEY + "}] %m%n"));
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java
new file mode 100644
index 0000000..25f883f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationLogger.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PullReplicationLogger {
+  public static final String PULL_REPLICATION_LOG_NAME = "pull_replication_log";
+  public static final Logger repLog = LoggerFactory.getLogger(PULL_REPLICATION_LOG_NAME);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
new file mode 100644
index 0000000..236c8b6
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -0,0 +1,107 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.StartReplicationCapability.START_REPLICATION;
+
+import com.google.common.eventbus.EventBus;
+import com.google.gerrit.extensions.annotations.Exports;
+import com.google.gerrit.extensions.config.CapabilityDefinition;
+import com.google.gerrit.extensions.events.LifecycleListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.config.SitePaths;
+import com.google.gerrit.server.events.EventTypes;
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.ProvisionException;
+import com.google.inject.Scopes;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.internal.UniqueAnnotations;
+import com.googlesource.gerrit.plugins.replication.AutoReloadConfigDecorator;
+import com.googlesource.gerrit.plugins.replication.AutoReloadSecureCredentialsFactoryDecorator;
+import com.googlesource.gerrit.plugins.replication.CredentialsFactory;
+import com.googlesource.gerrit.plugins.replication.ObservableQueue;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfig;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.util.FS;
+
+class PullReplicationModule extends AbstractModule {
+  private final Path cfgPath;
+
+  @Inject
+  public PullReplicationModule(SitePaths site) {
+    cfgPath = site.etc_dir.resolve("replication.config");
+  }
+
+  @Override
+  protected void configure() {
+    install(new FactoryModuleBuilder().build(Source.Factory.class));
+    bind(FetchReplicationMetrics.class).in(Scopes.SINGLETON);
+
+    bind(OnStartStop.class).in(Scopes.SINGLETON);
+    bind(LifecycleListener.class).annotatedWith(UniqueAnnotations.create()).to(OnStartStop.class);
+    bind(LifecycleListener.class)
+        .annotatedWith(UniqueAnnotations.create())
+        .to(PullReplicationLogFile.class);
+    bind(CredentialsFactory.class)
+        .to(AutoReloadSecureCredentialsFactoryDecorator.class)
+        .in(Scopes.SINGLETON);
+    bind(CapabilityDefinition.class)
+        .annotatedWith(Exports.named(START_REPLICATION))
+        .to(StartReplicationCapability.class);
+
+    install(new FactoryModuleBuilder().build(FetchAll.Factory.class));
+    install(new FactoryModuleBuilder().build(ReplicationState.Factory.class));
+
+    bind(EventBus.class).in(Scopes.SINGLETON);
+    bind(ReplicationSources.class).to(SourcesCollection.class);
+    bind(ObservableQueue.class).to(NoopObservableQueue.class).in(Scopes.SINGLETON);
+
+    bind(ReplicationConfigValidator.class).to(SourcesCollection.class);
+
+    if (getReplicationConfig().getBoolean("gerrit", "autoReload", false)) {
+      bind(ReplicationConfig.class).to(AutoReloadConfigDecorator.class);
+      bind(LifecycleListener.class)
+          .annotatedWith(UniqueAnnotations.create())
+          .to(AutoReloadConfigDecorator.class);
+    } else {
+      bind(ReplicationConfig.class).to(ReplicationFileBasedConfig.class);
+    }
+
+    DynamicSet.setOf(binder(), ReplicationStateListener.class);
+    DynamicSet.bind(binder(), ReplicationStateListener.class).to(PullReplicationStateLogger.class);
+    EventTypes.register(FetchRefReplicatedEvent.TYPE, FetchRefReplicatedEvent.class);
+    EventTypes.register(FetchRefReplicationDoneEvent.TYPE, FetchRefReplicationDoneEvent.class);
+    EventTypes.register(FetchReplicationScheduledEvent.TYPE, FetchReplicationScheduledEvent.class);
+  }
+
+  private FileBasedConfig getReplicationConfig() {
+    File replicationConfigFile = cfgPath.toFile();
+    FileBasedConfig config = new FileBasedConfig(replicationConfigFile, FS.DETECTED);
+    try {
+      config.load();
+    } catch (IOException | ConfigInvalidException e) {
+      throw new ProvisionException("Unable to load " + replicationConfigFile.getAbsolutePath(), e);
+    }
+    return config;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
new file mode 100644
index 0000000..7e60323
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
@@ -0,0 +1,56 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.pull.PullReplicationLogger.repLog;
+
+import com.google.inject.Singleton;
+
+/**
+ * Wrapper around a Logger that also logs out the replication state.
+ *
+ * <p>When logging replication errors it is useful to know the current replication state. This
+ * utility class wraps the methods from Logger and logs additional information about the replication
+ * state to the stderr console.
+ */
+@Singleton
+class PullReplicationStateLogger implements ReplicationStateListener {
+
+  @Override
+  public void warn(String msg, ReplicationState... states) {
+    stateWriteErr("Warning: " + msg, states);
+    repLog.warn(msg);
+  }
+
+  @Override
+  public void error(String msg, ReplicationState... states) {
+    stateWriteErr("Error: " + msg, states);
+    repLog.error(msg);
+  }
+
+  @Override
+  public void error(String msg, Throwable t, ReplicationState... states) {
+    stateWriteErr("Error: " + msg, states);
+    repLog.error(msg, t);
+  }
+
+  private void stateWriteErr(String msg, ReplicationState[] states) {
+    for (ReplicationState rs : states) {
+      if (rs != null) {
+        rs.writeStdErr(msg);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java
new file mode 100644
index 0000000..25f42cd
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationSources.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.server.git.WorkQueue;
+import java.util.List;
+
+/** Git destinations currently active for replication. */
+public interface ReplicationSources {
+
+  /**
+   * List of currently active replication sources.
+   *
+   * @return the list of active sources
+   */
+  List<Source> getAll();
+
+  /** @return true if there are no destinations, false otherwise. */
+  boolean isEmpty();
+
+  /**
+   * Start replicating from all sources.
+   *
+   * @param workQueue execution queue for scheduling the replication events.
+   */
+  void startup(WorkQueue workQueue);
+
+  /**
+   * Stop the replication from all sources.
+   *
+   * @return number of events cancelled during shutdown.
+   */
+  int shutdown();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
new file mode 100644
index 0000000..5044496
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
@@ -0,0 +1,184 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.AssistedInject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+
+public class ReplicationState {
+
+  public interface Factory {
+    ReplicationState create(FetchResultProcessing processing);
+  }
+
+  private boolean allScheduled;
+  private final FetchResultProcessing fetchResultProcessing;
+
+  private final Lock countingLock = new ReentrantLock();
+  private final CountDownLatch allFetchTasksFinished = new CountDownLatch(1);
+
+  private static class RefReplicationStatus {
+    private final String project;
+    private final String ref;
+    private int projectsToReplicateCount;
+    private int replicatedNodesCount;
+
+    RefReplicationStatus(String project, String ref) {
+      this.project = project;
+      this.ref = ref;
+    }
+
+    public boolean allDone() {
+      return replicatedNodesCount == projectsToReplicateCount;
+    }
+  }
+
+  private final Table<String, String, RefReplicationStatus> statusByProjectRef;
+  private int totalFetchTasksCount;
+  private int finishedFetchTasksCount;
+
+  @AssistedInject
+  ReplicationState(@Assisted FetchResultProcessing processing) {
+    fetchResultProcessing = processing;
+    statusByProjectRef = HashBasedTable.create();
+  }
+
+  public void increaseFetchTaskCount(String project, String ref) {
+    countingLock.lock();
+    try {
+      getRefStatus(project, ref).projectsToReplicateCount++;
+      totalFetchTasksCount++;
+    } finally {
+      countingLock.unlock();
+    }
+  }
+
+  public boolean hasFetchTask() {
+    return totalFetchTasksCount != 0;
+  }
+
+  public void notifyRefReplicated(
+      String project,
+      String ref,
+      URIish uri,
+      RefFetchResult status,
+      RefUpdate.Result refUpdateResult) {
+    fetchResultProcessing.onOneProjectReplicationDone(project, ref, uri, status, refUpdateResult);
+
+    RefReplicationStatus completedRefStatus = null;
+    boolean allFetchTasksCompleted = false;
+    countingLock.lock();
+    try {
+      RefReplicationStatus refStatus = getRefStatus(project, ref);
+      refStatus.replicatedNodesCount++;
+      finishedFetchTasksCount++;
+
+      if (allScheduled) {
+        if (refStatus.allDone()) {
+          completedRefStatus = statusByProjectRef.remove(project, ref);
+        }
+        allFetchTasksCompleted = finishedFetchTasksCount == totalFetchTasksCount;
+      }
+    } finally {
+      countingLock.unlock();
+    }
+
+    if (completedRefStatus != null) {
+      doRefFetchTasksCompleted(completedRefStatus);
+    }
+
+    if (allFetchTasksCompleted) {
+      doAllFetchTasksCompleted();
+    }
+  }
+
+  public void markAllFetchTasksScheduled() {
+    countingLock.lock();
+    try {
+      allScheduled = true;
+      if (finishedFetchTasksCount < totalFetchTasksCount) {
+        return;
+      }
+    } finally {
+      countingLock.unlock();
+    }
+
+    doAllFetchTasksCompleted();
+  }
+
+  private void doAllFetchTasksCompleted() {
+    fireRemainingOnRefReplicatedFromAllNodes();
+    fetchResultProcessing.onAllRefsReplicatedFromAllNodes(totalFetchTasksCount);
+    allFetchTasksFinished.countDown();
+  }
+
+  /**
+   * Some could be remaining if replication of a ref is completed before all tasks are scheduled.
+   */
+  private void fireRemainingOnRefReplicatedFromAllNodes() {
+    for (RefReplicationStatus refStatus : statusByProjectRef.values()) {
+      doRefFetchTasksCompleted(refStatus);
+    }
+  }
+
+  private void doRefFetchTasksCompleted(RefReplicationStatus refStatus) {
+    fetchResultProcessing.onRefReplicatedFromAllNodes(
+        refStatus.project, refStatus.ref, refStatus.projectsToReplicateCount);
+  }
+
+  private RefReplicationStatus getRefStatus(String project, String ref) {
+    if (!statusByProjectRef.contains(project, ref)) {
+      RefReplicationStatus refStatus = new RefReplicationStatus(project, ref);
+      statusByProjectRef.put(project, ref, refStatus);
+      return refStatus;
+    }
+    return statusByProjectRef.get(project, ref);
+  }
+
+  public void waitForReplication() throws InterruptedException {
+    allFetchTasksFinished.await();
+  }
+
+  public void writeStdOut(String message) {
+    fetchResultProcessing.writeStdOut(message);
+  }
+
+  public void writeStdErr(String message) {
+    fetchResultProcessing.writeStdErr(message);
+  }
+
+  public enum RefFetchResult {
+    /** The ref was not successfully replicated. */
+    FAILED,
+
+    /** The ref is not configured to be replicated. */
+    NOT_ATTEMPTED,
+
+    /** The ref was successfully replicated. */
+    SUCCEEDED;
+
+    @Override
+    public String toString() {
+      return name().toLowerCase().replace("_", "-");
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java
new file mode 100644
index 0000000..c4b11b7
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListener.java
@@ -0,0 +1,50 @@
+// Copyright (C) 2015 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+/** Interface for notifying replication status updates. */
+public interface ReplicationStateListener {
+
+  /**
+   * Notify a non-fatal replication error.
+   *
+   * <p>Replication states received a non-fatal error with an associated warning message.
+   *
+   * @param msg message description of the error
+   * @param states replication states impacted
+   */
+  void warn(String msg, ReplicationState... states);
+
+  /**
+   * Notify a fatal replication error.
+   *
+   * <p>Replication states have received a fatal error and replication has failed.
+   *
+   * @param msg message description of the error
+   * @param states replication states impacted
+   */
+  void error(String msg, ReplicationState... states);
+
+  /**
+   * Notify a fatal replication error with the associated exception.
+   *
+   * <p>Replication states have received a fatal exception and replication has failed.
+   *
+   * @param msg message description of the error
+   * @param t exception that caused the replication to fail
+   * @param states replication states impacted
+   */
+  void error(String msg, Throwable t, ReplicationState... states);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java
new file mode 100644
index 0000000..f1db73c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateListeners.java
@@ -0,0 +1,48 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.Inject;
+
+public class ReplicationStateListeners implements ReplicationStateListener {
+  private final DynamicSet<ReplicationStateListener> listeners;
+
+  @Inject
+  ReplicationStateListeners(DynamicSet<ReplicationStateListener> stateListeners) {
+    this.listeners = stateListeners;
+  }
+
+  @Override
+  public void warn(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.warn(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, states);
+    }
+  }
+
+  @Override
+  public void error(String msg, Throwable t, ReplicationState... states) {
+    for (ReplicationStateListener listener : listeners) {
+      listener.error(msg, t, states);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
new file mode 100644
index 0000000..dbe8459
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -0,0 +1,636 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig.replaceName;
+import static com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.resolveNodeName;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSet.Builder;
+import com.google.common.collect.Lists;
+import com.google.gerrit.common.data.GroupReference;
+import com.google.gerrit.entities.AccountGroup;
+import com.google.gerrit.entities.BranchNameKey;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.entities.RefNames;
+import com.google.gerrit.extensions.config.FactoryModule;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.AuthException;
+import com.google.gerrit.server.CurrentUser;
+import com.google.gerrit.server.PluginUser;
+import com.google.gerrit.server.account.GroupBackend;
+import com.google.gerrit.server.account.GroupBackends;
+import com.google.gerrit.server.account.GroupIncludeCache;
+import com.google.gerrit.server.account.ListGroupMembership;
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.git.PerThreadRequestScope;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.permissions.PermissionBackend;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.google.gerrit.server.permissions.ProjectPermission;
+import com.google.gerrit.server.permissions.RefPermission;
+import com.google.gerrit.server.project.NoSuchProjectException;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gerrit.server.util.RequestContext;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Provider;
+import com.google.inject.Provides;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.servlet.RequestScoped;
+import com.googlesource.gerrit.plugins.replication.RemoteSiteUser;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FilenameUtils;
+import org.eclipse.jgit.lib.Constants;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+import org.slf4j.Logger;
+
+public class Source {
+  private static final Logger repLog = PullReplicationLogger.repLog;
+
+  public interface Factory {
+    Source create(SourceConfiguration config);
+  }
+
+  private final ReplicationStateListener stateLog;
+  private final Object stateLock = new Object();
+  private final Map<URIish, FetchOne> pending = new HashMap<>();
+  private final Map<URIish, FetchOne> inFlight = new HashMap<>();
+  private final FetchOne.Factory opFactory;
+  private final GitRepositoryManager gitManager;
+  private final PermissionBackend permissionBackend;
+  private final Provider<CurrentUser> userProvider;
+  private final ProjectCache projectCache;
+  private volatile ScheduledExecutorService pool;
+  private final PerThreadRequestScope.Scoper threadScoper;
+  private final SourceConfiguration config;
+  private final DynamicItem<EventDispatcher> eventDispatcher;
+
+  protected enum RetryReason {
+    TRANSPORT_ERROR,
+    COLLISION,
+    REPOSITORY_MISSING
+  }
+
+  public static class QueueInfo {
+    public final Map<URIish, FetchOne> pending;
+    public final Map<URIish, FetchOne> inFlight;
+
+    public QueueInfo(Map<URIish, FetchOne> pending, Map<URIish, FetchOne> inFlight) {
+      this.pending = ImmutableMap.copyOf(pending);
+      this.inFlight = ImmutableMap.copyOf(inFlight);
+    }
+  }
+
+  @Inject
+  protected Source(
+      Injector injector,
+      @Assisted SourceConfiguration cfg,
+      PluginUser pluginUser,
+      GitRepositoryManager gitRepositoryManager,
+      PermissionBackend permissionBackend,
+      Provider<CurrentUser> userProvider,
+      ProjectCache projectCache,
+      GroupBackend groupBackend,
+      ReplicationStateListeners stateLog,
+      GroupIncludeCache groupIncludeCache,
+      DynamicItem<EventDispatcher> eventDispatcher) {
+    config = cfg;
+    this.eventDispatcher = eventDispatcher;
+    gitManager = gitRepositoryManager;
+    this.permissionBackend = permissionBackend;
+    this.userProvider = userProvider;
+    this.projectCache = projectCache;
+    this.stateLog = stateLog;
+
+    CurrentUser remoteUser;
+    if (!cfg.getAuthGroupNames().isEmpty()) {
+      Builder<AccountGroup.UUID> builder = ImmutableSet.builder();
+      for (String name : cfg.getAuthGroupNames()) {
+        GroupReference g = GroupBackends.findExactSuggestion(groupBackend, name);
+        if (g != null) {
+          builder.add(g.getUUID());
+          addRecursiveParents(g.getUUID(), builder, groupIncludeCache);
+        } else {
+          repLog.warn("Group \"{}\" not recognized, removing from authGroup", name);
+        }
+      }
+      remoteUser = new RemoteSiteUser(new ListGroupMembership(builder.build()));
+    } else {
+      remoteUser = pluginUser;
+    }
+
+    Injector child =
+        injector.createChildInjector(
+            new FactoryModule() {
+              @Override
+              protected void configure() {
+                bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
+                bind(PerThreadRequestScope.Propagator.class);
+
+                bind(Source.class).toInstance(Source.this);
+                bind(RemoteConfig.class).toInstance(config.getRemoteConfig());
+                install(new FactoryModuleBuilder().build(FetchOne.Factory.class));
+              }
+
+              @Provides
+              public PerThreadRequestScope.Scoper provideScoper(
+                  final PerThreadRequestScope.Propagator propagator) {
+                final RequestContext requestContext =
+                    new RequestContext() {
+                      @Override
+                      public CurrentUser getUser() {
+                        return remoteUser;
+                      }
+                    };
+                return new PerThreadRequestScope.Scoper() {
+                  @Override
+                  public <T> Callable<T> scope(Callable<T> callable) {
+                    return propagator.scope(requestContext, callable);
+                  }
+                };
+              }
+            });
+
+    opFactory = child.getInstance(FetchOne.Factory.class);
+    threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
+  }
+
+  private void addRecursiveParents(
+      AccountGroup.UUID g,
+      Builder<AccountGroup.UUID> builder,
+      GroupIncludeCache groupIncludeCache) {
+    for (AccountGroup.UUID p : groupIncludeCache.parentGroupsOf(g)) {
+      if (builder.build().contains(p)) {
+        continue;
+      }
+      builder.add(p);
+      addRecursiveParents(p, builder, groupIncludeCache);
+    }
+  }
+
+  public QueueInfo getQueueInfo() {
+    synchronized (stateLock) {
+      return new QueueInfo(pending, inFlight);
+    }
+  }
+
+  public void start(WorkQueue workQueue) {
+    String poolName = "ReplicateFrom-" + config.getRemoteConfig().getName();
+    pool = workQueue.createQueue(config.getPoolThreads(), poolName);
+  }
+
+  public int shutdown() {
+    int cnt = 0;
+    if (pool != null) {
+      cnt = pool.shutdownNow().size();
+      pool = null;
+    }
+    return cnt;
+  }
+
+  private boolean shouldReplicate(ProjectState state, CurrentUser user)
+      throws PermissionBackendException {
+    if (!config.replicateHiddenProjects()
+        && state.getProject().getState()
+            == com.google.gerrit.extensions.client.ProjectState.HIDDEN) {
+      return false;
+    }
+
+    // Hidden projects(permitsRead = false) should only be accessible by the project owners.
+    // READ_CONFIG is checked here because it's only allowed to project owners(ACCESS may also
+    // be allowed for other users).
+    ProjectPermission permissionToCheck =
+        state.statePermitsRead() ? ProjectPermission.ACCESS : ProjectPermission.READ_CONFIG;
+    try {
+      permissionBackend.user(user).project(state.getNameKey()).check(permissionToCheck);
+      return true;
+    } catch (AuthException e) {
+      return false;
+    }
+  }
+
+  private boolean shouldReplicate(
+      final Project.NameKey project, String ref, ReplicationState... states) {
+    try {
+      return threadScoper
+          .scope(
+              new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws NoSuchProjectException, PermissionBackendException {
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  if (!projectState.statePermitsRead()) {
+                    return false;
+                  }
+                  if (!shouldReplicate(projectState, userProvider.get())) {
+                    return false;
+                  }
+                  if (FetchOne.ALL_REFS.equals(ref)) {
+                    return true;
+                  }
+                  try {
+                    permissionBackend
+                        .user(userProvider.get())
+                        .project(project)
+                        .ref(ref)
+                        .check(RefPermission.READ);
+                  } catch (AuthException e) {
+                    return false;
+                  }
+                  return true;
+                }
+              })
+          .call();
+    } catch (NoSuchProjectException err) {
+      stateLog.error(String.format("source project %s not available", project), err, states);
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+    return false;
+  }
+
+  private boolean shouldReplicate(Project.NameKey project, ReplicationState... states) {
+    try {
+      return threadScoper
+          .scope(
+              new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws NoSuchProjectException, PermissionBackendException {
+                  ProjectState projectState;
+                  try {
+                    projectState = projectCache.checkedGet(project);
+                  } catch (IOException e) {
+                    return false;
+                  }
+                  if (projectState == null) {
+                    throw new NoSuchProjectException(project);
+                  }
+                  return shouldReplicate(projectState, userProvider.get());
+                }
+              })
+          .call();
+    } catch (NoSuchProjectException err) {
+      stateLog.error(String.format("source project %s not available", project), err, states);
+    } catch (Exception e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+    return false;
+  }
+
+  void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
+    schedule(project, ref, uri, state, false);
+  }
+
+  void schedule(
+      Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+    repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
+    if (!shouldReplicate(project, ref, state)) {
+      return;
+    }
+
+    if (!config.replicatePermissions()) {
+      FetchOne e;
+      synchronized (stateLock) {
+        e = pending.get(uri);
+      }
+      if (e == null) {
+        try (Repository git = gitManager.openRepository(project)) {
+          try {
+            Ref head = git.exactRef(Constants.HEAD);
+            if (head != null
+                && head.isSymbolic()
+                && RefNames.REFS_CONFIG.equals(head.getLeaf().getName())) {
+              return;
+            }
+          } catch (IOException err) {
+            stateLog.error(String.format("cannot check type of project %s", project), err, state);
+            return;
+          }
+        } catch (IOException err) {
+          stateLog.error(String.format("source project %s not available", project), err, state);
+          return;
+        }
+      }
+    }
+
+    synchronized (stateLock) {
+      FetchOne e = pending.get(uri);
+      if (e == null) {
+        e = opFactory.create(project, uri);
+        addRef(e, ref);
+        e.addState(ref, state);
+        pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        pending.put(uri, e);
+      } else if (!e.getRefs().contains(ref)) {
+        addRef(e, ref);
+        e.addState(ref, state);
+      }
+      state.increaseFetchTaskCount(project.get(), ref);
+      repLog.info("scheduled {}:{} => {} to run after {}s", e, ref, project, config.getDelay());
+    }
+  }
+
+  void fetchWasCanceled(FetchOne fetchOp) {
+    synchronized (stateLock) {
+      URIish uri = fetchOp.getURI();
+      pending.remove(uri);
+    }
+  }
+
+  private void addRef(FetchOne e, String ref) {
+    e.addRef(ref);
+    postReplicationScheduledEvent(e, ref);
+  }
+
+  /**
+   * It schedules again a FetchOp instance.
+   *
+   * <p>If the reason for rescheduling is to avoid a collision with an in-flight push to the same
+   * URI, we don't mark the operation as "retrying," and we schedule using the replication delay,
+   * rather than the retry delay. Otherwise, the operation is marked as "retrying" and scheduled to
+   * run following the minutes count determined by class attribute retryDelay.
+   *
+   * <p>In case the FetchOp instance to be scheduled has same URI than one marked as "retrying," it
+   * adds to the one pending the refs list of the parameter instance.
+   *
+   * <p>In case the FetchOp instance to be scheduled has the same URI as one pending, but not marked
+   * "retrying," it indicates the one pending should be canceled when it starts executing, removes
+   * it from pending list, and adds its refs to the parameter instance. The parameter instance is
+   * scheduled for retry.
+   *
+   * <p>Notice all operations to indicate a FetchOp should be canceled, or it is retrying, or
+   * remove/add it from/to pending Map should be protected by synchronizing on the stateLock object.
+   *
+   * @param fetchOp The FetchOp instance to be scheduled.
+   */
+  void reschedule(FetchOne fetchOp, RetryReason reason) {
+    synchronized (stateLock) {
+      URIish uri = fetchOp.getURI();
+      FetchOne pendingFetchOp = pending.get(uri);
+
+      if (pendingFetchOp != null) {
+        // There is one FetchOp instance already pending to same URI.
+
+        if (pendingFetchOp.isRetrying()) {
+          // The one pending is one already retrying, so it should
+          // maintain it and add to it the refs of the one passed
+          // as parameter to the method.
+
+          // This scenario would happen if a FetchOp has started running
+          // and then before it failed due transport exception, another
+          // one to same URI started. The first one would fail and would
+          // be rescheduled, being present in pending list. When the
+          // second one fails, it will also be rescheduled and then,
+          // here, find out replication to its URI is already pending
+          // for retry (blocking).
+          pendingFetchOp.addRefs(fetchOp.getRefs());
+          pendingFetchOp.addStates(fetchOp.getStates());
+          fetchOp.removeStates();
+
+        } else {
+          // The one pending is one that is NOT retrying, it was just
+          // scheduled believing no problem would happen. The one pending
+          // should be canceled, and this is done by setting its canceled
+          // flag, removing it from pending list, and adding its refs to
+          // the fetchOp instance that should then, later, in this method,
+          // be scheduled for retry.
+
+          // Notice that the FetchOp found pending will start running and,
+          // when notifying it is starting (with pending lock protection),
+          // it will see it was canceled and then it will do nothing with
+          // pending list and it will not execute its run implementation.
+          pendingFetchOp.canceledByReplication();
+          pending.remove(uri);
+
+          fetchOp.addRefs(pendingFetchOp.getRefs());
+          fetchOp.addStates(pendingFetchOp.getStates());
+          pendingFetchOp.removeStates();
+        }
+      }
+
+      if (pendingFetchOp == null || !pendingFetchOp.isRetrying()) {
+        pending.put(uri, fetchOp);
+        switch (reason) {
+          case COLLISION:
+            pool.schedule(fetchOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+            break;
+          case TRANSPORT_ERROR:
+          case REPOSITORY_MISSING:
+          default:
+            RefUpdate.Result trackingRefUpdate =
+                RetryReason.REPOSITORY_MISSING.equals(reason)
+                    ? RefUpdate.Result.NOT_ATTEMPTED
+                    : RefUpdate.Result.REJECTED_OTHER_REASON;
+            postReplicationFailedEvent(fetchOp, trackingRefUpdate);
+            if (fetchOp.setToRetry()) {
+              postReplicationScheduledEvent(fetchOp);
+              pool.schedule(fetchOp, config.getRetryDelay(), TimeUnit.MINUTES);
+            } else {
+              fetchOp.canceledByReplication();
+              pending.remove(uri);
+              stateLog.error(
+                  "Fetch from " + fetchOp.getURI() + " cancelled after maximum number of retries",
+                  fetchOp.getStatesAsArray());
+            }
+            break;
+        }
+      }
+    }
+  }
+
+  boolean requestRunway(FetchOne op) {
+    synchronized (stateLock) {
+      if (op.wasCanceled()) {
+        return false;
+      }
+      pending.remove(op.getURI());
+      if (inFlight.containsKey(op.getURI())) {
+        return false;
+      }
+      inFlight.put(op.getURI(), op);
+    }
+    return true;
+  }
+
+  void notifyFinished(FetchOne op) {
+    synchronized (stateLock) {
+      inFlight.remove(op.getURI());
+    }
+  }
+
+  boolean wouldFetchProject(Project.NameKey project) {
+    if (!shouldReplicate(project)) {
+      return false;
+    }
+
+    // by default fetch all projects
+    List<String> projects = config.getProjects();
+    if (projects.isEmpty()) {
+      return true;
+    }
+
+    return (new ReplicationFilter(projects)).matches(project);
+  }
+
+  public boolean isSingleProjectMatch() {
+    return config.isSingleProjectMatch();
+  }
+
+  List<URIish> getURIs(Project.NameKey project, String urlMatch) {
+    List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
+    for (URIish uri : config.getRemoteConfig().getURIs()) {
+      if (matches(uri, urlMatch)) {
+        String name = project.get();
+        if (needsUrlEncoding(uri)) {
+          name = encode(name);
+        }
+        String remoteNameStyle = config.getRemoteNameStyle();
+        if (remoteNameStyle.equals("dash")) {
+          name = name.replace("/", "-");
+        } else if (remoteNameStyle.equals("underscore")) {
+          name = name.replace("/", "_");
+        } else if (remoteNameStyle.equals("basenameOnly")) {
+          name = FilenameUtils.getBaseName(name);
+        } else if (!remoteNameStyle.equals("slash")) {
+          repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+        }
+        String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
+        if (replacedPath != null) {
+          uri = uri.setPath(replacedPath);
+          r.add(uri);
+        }
+      }
+    }
+    return r;
+  }
+
+  static boolean needsUrlEncoding(URIish uri) {
+    return "http".equalsIgnoreCase(uri.getScheme())
+        || "https".equalsIgnoreCase(uri.getScheme())
+        || "amazon-s3".equalsIgnoreCase(uri.getScheme());
+  }
+
+  static String encode(String str) {
+    try {
+      // Some cleanup is required. The '/' character is always encoded as %2F
+      // however remote servers will expect it to be not encoded as part of the
+      // path used to the repository. Space is incorrectly encoded as '+' for this
+      // context. In the path part of a URI space should be %20, but in form data
+      // space is '+'. Our cleanup replace fixes these two issues.
+      return URLEncoder.encode(str, "UTF-8").replaceAll("%2[fF]", "/").replace("+", "%20");
+    } catch (UnsupportedEncodingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  ImmutableList<String> getAdminUrls() {
+    return config.getAdminUrls();
+  }
+
+  ImmutableList<String> getUrls() {
+    return config.getUrls();
+  }
+
+  ImmutableList<String> getAuthGroupNames() {
+    return config.getAuthGroupNames();
+  }
+
+  ImmutableList<String> getProjects() {
+    return config.getProjects();
+  }
+
+  int getLockErrorMaxRetries() {
+    return config.getLockErrorMaxRetries();
+  }
+
+  String getRemoteConfigName() {
+    return config.getRemoteConfig().getName();
+  }
+
+  public int getMaxRetries() {
+    return config.getMaxRetries();
+  }
+
+  private static boolean matches(URIish uri, String urlMatch) {
+    if (urlMatch == null || urlMatch.equals("") || urlMatch.equals("*")) {
+      return true;
+    }
+    return uri.toString().contains(urlMatch);
+  }
+
+  private void postReplicationScheduledEvent(FetchOne fetchOp) {
+    postReplicationScheduledEvent(fetchOp, null);
+  }
+
+  private void postReplicationScheduledEvent(FetchOne fetchOp, String inputRef) {
+    Set<String> refs = inputRef == null ? fetchOp.getRefs() : ImmutableSet.of(inputRef);
+    Project.NameKey project = fetchOp.getProjectNameKey();
+    String targetNode = resolveNodeName(fetchOp.getURI());
+    for (String ref : refs) {
+      FetchReplicationScheduledEvent event =
+          new FetchReplicationScheduledEvent(project.get(), ref, targetNode);
+      try {
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+      } catch (PermissionBackendException e) {
+        repLog.error("error posting event", e);
+      }
+    }
+  }
+
+  private void postReplicationFailedEvent(FetchOne fetchOp, RefUpdate.Result result) {
+    Project.NameKey project = fetchOp.getProjectNameKey();
+    String sourceNode = resolveNodeName(fetchOp.getURI());
+    for (String ref : fetchOp.getRefs()) {
+      FetchRefReplicatedEvent event =
+          new FetchRefReplicatedEvent(
+              project.get(), ref, sourceNode, ReplicationState.RefFetchResult.FAILED, result);
+      try {
+        eventDispatcher.get().postEvent(BranchNameKey.create(project, ref), event);
+      } catch (PermissionBackendException e) {
+        repLog.error("error posting event", e);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
new file mode 100644
index 0000000..8ef68b0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourceConfiguration.java
@@ -0,0 +1,153 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.gerrit.server.config.ConfigUtil;
+import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jgit.lib.Config;
+import org.eclipse.jgit.transport.RemoteConfig;
+
+public class SourceConfiguration implements RemoteConfiguration {
+  static final int DEFAULT_REPLICATION_DELAY = 15;
+  static final int DEFAULT_RESCHEDULE_DELAY = 3;
+  static final int DEFAULT_SLOW_LATENCY_THRESHOLD_SECS = 900;
+
+  private final int delay;
+  private final int rescheduleDelay;
+  private final int retryDelay;
+  private final int lockErrorMaxRetries;
+  private final ImmutableList<String> adminUrls;
+  private final int poolThreads;
+  private final boolean replicatePermissions;
+  private final boolean replicateHiddenProjects;
+  private final String remoteNameStyle;
+  private final ImmutableList<String> urls;
+  private final ImmutableList<String> projects;
+  private final ImmutableList<String> authGroupNames;
+  private final RemoteConfig remoteConfig;
+  private final int maxRetries;
+  private int slowLatencyThreshold;
+
+  public SourceConfiguration(RemoteConfig remoteConfig, Config cfg) {
+    this.remoteConfig = remoteConfig;
+    String name = remoteConfig.getName();
+    urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
+    delay = Math.max(0, getInt(remoteConfig, cfg, "replicationdelay", DEFAULT_REPLICATION_DELAY));
+    rescheduleDelay =
+        Math.max(3, getInt(remoteConfig, cfg, "rescheduledelay", DEFAULT_RESCHEDULE_DELAY));
+    projects = ImmutableList.copyOf(cfg.getStringList("remote", name, "projects"));
+    adminUrls = ImmutableList.copyOf(cfg.getStringList("remote", name, "adminUrl"));
+    retryDelay = Math.max(0, getInt(remoteConfig, cfg, "replicationretry", 1));
+    poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
+    authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
+    lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
+
+    replicatePermissions = cfg.getBoolean("remote", name, "replicatePermissions", true);
+    replicateHiddenProjects = cfg.getBoolean("remote", name, "replicateHiddenProjects", false);
+    remoteNameStyle =
+        MoreObjects.firstNonNull(cfg.getString("remote", name, "remoteNameStyle"), "slash");
+    maxRetries =
+        getInt(
+            remoteConfig, cfg, "replicationMaxRetries", cfg.getInt("replication", "maxRetries", 0));
+    slowLatencyThreshold =
+        (int)
+            ConfigUtil.getTimeUnit(
+                cfg,
+                "remote",
+                remoteConfig.getName(),
+                "slowLatencyThreshold",
+                DEFAULT_SLOW_LATENCY_THRESHOLD_SECS,
+                TimeUnit.SECONDS);
+  }
+
+  @Override
+  public int getDelay() {
+    return delay;
+  }
+
+  @Override
+  public int getRescheduleDelay() {
+    return rescheduleDelay;
+  }
+
+  @Override
+  public int getRetryDelay() {
+    return retryDelay;
+  }
+
+  public int getPoolThreads() {
+    return poolThreads;
+  }
+
+  public int getLockErrorMaxRetries() {
+    return lockErrorMaxRetries;
+  }
+
+  @Override
+  public ImmutableList<String> getUrls() {
+    return urls;
+  }
+
+  @Override
+  public ImmutableList<String> getAdminUrls() {
+    return adminUrls;
+  }
+
+  @Override
+  public ImmutableList<String> getProjects() {
+    return projects;
+  }
+
+  @Override
+  public ImmutableList<String> getAuthGroupNames() {
+    return authGroupNames;
+  }
+
+  @Override
+  public String getRemoteNameStyle() {
+    return remoteNameStyle;
+  }
+
+  @Override
+  public boolean replicatePermissions() {
+    return replicatePermissions;
+  }
+
+  public boolean replicateHiddenProjects() {
+    return replicateHiddenProjects;
+  }
+
+  @Override
+  public RemoteConfig getRemoteConfig() {
+    return remoteConfig;
+  }
+
+  @Override
+  public int getMaxRetries() {
+    return maxRetries;
+  }
+
+  private static int getInt(RemoteConfig rc, Config cfg, String name, int defValue) {
+    return cfg.getInt("remote", rc.getName(), name, defValue);
+  }
+
+  @Override
+  public int getSlowLatencyThreshold() {
+    return slowLatencyThreshold;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
new file mode 100644
index 0000000..f24d25e
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SourcesCollection.java
@@ -0,0 +1,181 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.replication.RemoteConfiguration;
+import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
+import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.eclipse.jgit.errors.ConfigInvalidException;
+import org.eclipse.jgit.storage.file.FileBasedConfig;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.URIish;
+
+@Singleton
+public class SourcesCollection implements ReplicationSources, ReplicationConfigValidator {
+  private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+  private final Source.Factory sourceFactory;
+  private volatile List<Source> sources;
+  private boolean shuttingDown;
+
+  @Inject
+  public SourcesCollection(
+      ReplicationFileBasedConfig replicationConfig, Source.Factory sourceFactory, EventBus eventBus)
+      throws ConfigInvalidException {
+    this.sourceFactory = sourceFactory;
+    this.sources = allSources(sourceFactory, validateConfig(replicationConfig));
+    eventBus.register(this);
+  }
+
+  @Override
+  public List<Source> getAll() {
+    return sources.stream().filter(Objects::nonNull).collect(toList());
+  }
+
+  private List<Source> allSources(
+      Source.Factory sourceFactory, List<RemoteConfiguration> sourceConfigurations) {
+    return sourceConfigurations.stream()
+        .filter((c) -> c instanceof SourceConfiguration)
+        .map((c) -> (SourceConfiguration) c)
+        .map(sourceFactory::create)
+        .collect(toList());
+  }
+
+  @Override
+  public void startup(WorkQueue workQueue) {
+    shuttingDown = false;
+    for (Source cfg : sources) {
+      cfg.start(workQueue);
+    }
+  }
+
+  /* shutdown() cannot be set as a synchronized method because
+   * it may need to wait for pending events to complete;
+   * e.g. when enabling the drain of replication events before
+   * shutdown.
+   *
+   * As a rule of thumb for synchronized methods, because they
+   * implicitly define a critical section and associated lock,
+   * they should never hold waiting for another resource, otherwise
+   * the risk of deadlock is very high.
+   *
+   * See more background about deadlocks, what they are and how to
+   * prevent them at: https://en.wikipedia.org/wiki/Deadlock
+   */
+  @Override
+  public int shutdown() {
+    synchronized (this) {
+      shuttingDown = true;
+    }
+
+    int discarded = 0;
+    for (Source cfg : sources) {
+      discarded += cfg.shutdown();
+    }
+    return discarded;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return sources.isEmpty();
+  }
+
+  @Subscribe
+  public synchronized void onReload(List<RemoteConfiguration> sourceConfigurations) {
+    if (shuttingDown) {
+      logger.atWarning().log("Shutting down: configuration reload ignored");
+      return;
+    }
+
+    sources = allSources(sourceFactory, sourceConfigurations);
+    logger.atInfo().log("Configuration reloaded: %d sources", getAll().size());
+  }
+
+  @Override
+  public List<RemoteConfiguration> validateConfig(ReplicationFileBasedConfig newConfig)
+      throws ConfigInvalidException {
+
+    try {
+      newConfig.getConfig().load();
+    } catch (IOException e) {
+      throw new ConfigInvalidException(
+          String.format("Cannot read %s: %s", newConfig.getConfig().getFile(), e.getMessage()), e);
+    }
+
+    ImmutableList.Builder<RemoteConfiguration> sourceConfigs = ImmutableList.builder();
+    for (RemoteConfig c : allFetchRemotes(newConfig.getConfig())) {
+      if (c.getURIs().isEmpty()) {
+        continue;
+      }
+
+      // fetch source has to be specified.
+      if (c.getFetchRefSpecs().isEmpty()) {
+        throw new ConfigInvalidException(
+            String.format("You must specify a valid refSpec for this remote"));
+      }
+
+      SourceConfiguration sourceConfig = new SourceConfiguration(c, newConfig.getConfig());
+
+      if (!sourceConfig.isSingleProjectMatch()) {
+        for (URIish u : c.getURIs()) {
+          if (u.getPath() == null || !u.getPath().contains("${name}")) {
+            throw new ConfigInvalidException(
+                String.format("remote.%s.url \"%s\" lacks ${name} placeholder", c.getName(), u));
+          }
+        }
+      }
+      sourceConfigs.add(sourceConfig);
+    }
+    return sourceConfigs.build();
+  }
+
+  private static List<RemoteConfig> allFetchRemotes(FileBasedConfig cfg)
+      throws ConfigInvalidException {
+
+    Set<String> names = cfg.getSubsections("remote");
+    List<RemoteConfig> result = Lists.newArrayListWithCapacity(names.size());
+    for (String name : names) {
+      try {
+        final RemoteConfig remoteConfig = new RemoteConfig(cfg, name);
+        if (!remoteConfig.getFetchRefSpecs().isEmpty()) {
+          result.add(remoteConfig);
+        } else {
+          logger.atWarning().log(
+              "Skip loading of remote [remote \"%s\"], since it has no 'fetch' configuration",
+              name);
+        }
+      } catch (URISyntaxException e) {
+        throw new ConfigInvalidException(
+            String.format("remote %s has invalid URL in %s", name, cfg.getFile()));
+      }
+    }
+    return result;
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java
new file mode 100644
index 0000000..9c89781
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/SshModule.java
@@ -0,0 +1,25 @@
+// Copyright (C) 2012 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.sshd.PluginCommandModule;
+
+class SshModule extends PluginCommandModule {
+  @Override
+  protected void configureCommands() {
+    command(StartFetchCommand.class);
+    command(ListCommand.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
new file mode 100644
index 0000000..bec0ffa
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
@@ -0,0 +1,120 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.SshCommand;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.ReplicationFilter;
+import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.Option;
+
+@RequiresCapability(StartReplicationCapability.START_REPLICATION)
+@CommandMetaData(
+    name = "start",
+    description = "Start replication for specific project or all projects")
+public final class StartFetchCommand extends SshCommand {
+  @Inject private PullReplicationStateLogger fetchStateLog;
+
+  @Option(name = "--all", usage = "fetch all known projects")
+  private boolean all;
+
+  @Option(name = "--url", metaVar = "PATTERN", usage = "pattern to match URL on")
+  private String urlMatch;
+
+  @Option(name = "--wait", usage = "wait for replication to finish before exiting")
+  private boolean wait;
+
+  @Option(name = "--now", usage = "start replication without waiting for replicationDelay")
+  private boolean now;
+
+  @Argument(index = 0, multiValued = true, metaVar = "PATTERN", usage = "project name pattern")
+  private List<String> projectPatterns = new ArrayList<>(2);
+
+  @Inject private FetchAll.Factory fetchFactory;
+
+  @Inject private ReplicationState.Factory fetchReplicationStateFactory;
+
+  @Override
+  protected void run() throws Failure {
+    if (all && projectPatterns.size() > 0) {
+      throw new UnloggedFailure(1, "error: cannot combine --all and PROJECT");
+    }
+
+    ReplicationState state =
+        fetchReplicationStateFactory.create(new FetchResultProcessing.CommandProcessing(this));
+    Future<?> future = null;
+
+    ReplicationFilter projectFilter;
+
+    if (all) {
+      projectFilter = ReplicationFilter.all();
+    } else {
+      projectFilter = new ReplicationFilter(projectPatterns);
+    }
+
+    future = fetchFactory.create(urlMatch, projectFilter, state, now).schedule(0, TimeUnit.SECONDS);
+
+    if (wait) {
+      if (future != null) {
+        try {
+          future.get();
+        } catch (InterruptedException e) {
+          fetchStateLog.error(
+              "Thread was interrupted while waiting for FetchAll operation to finish", e, state);
+          return;
+        } catch (ExecutionException e) {
+          fetchStateLog.error("An exception was thrown in FetchAll operation", e, state);
+          return;
+        }
+      }
+
+      if (state.hasFetchTask()) {
+        try {
+          state.waitForReplication();
+        } catch (InterruptedException e) {
+          writeStdErrSync("We are interrupted while waiting replication to complete");
+        }
+      } else {
+        writeStdOutSync("Nothing to replicate");
+      }
+    }
+  }
+
+  public void writeStdOutSync(String message) {
+    if (wait) {
+      synchronized (stdout) {
+        stdout.println(message);
+        stdout.flush();
+      }
+    }
+  }
+
+  public void writeStdErrSync(String message) {
+    if (wait) {
+      synchronized (stderr) {
+        stderr.println(message);
+        stderr.flush();
+      }
+    }
+  }
+}
diff --git a/src/main/resources/Documentation/about.md b/src/main/resources/Documentation/about.md
new file mode 100644
index 0000000..46ec740
--- /dev/null
+++ b/src/main/resources/Documentation/about.md
@@ -0,0 +1,10 @@
+This plugin can automatically mirror repositories from other systems.
+
+Typically replication should be done over SSH, with a passwordless
+public/private key pair.  On a trusted network it is also possible to
+use replication over the insecure (but much faster due to no
+authentication overhead or encryption) git:// protocol, by enabling
+the `upload-pack` service on the receiving system, but this
+configuration is not recommended.  It is also possible to specify a
+local path as replication source. This makes e.g. sense if a network
+share is mounted to which the repositories should be replicated from.
diff --git a/src/main/resources/Documentation/cmd-list.md b/src/main/resources/Documentation/cmd-list.md
new file mode 100644
index 0000000..7142ae1
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-list.md
@@ -0,0 +1,74 @@
+@PLUGIN@ list
+==============
+
+NAME
+----
+@PLUGIN@ list - List remote destination information.
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list
+  [--remote <PATTERN>]
+  [--detail]
+  [--json]
+```
+
+DESCRIPTION
+-----------
+Lists the name and URL for remote sources.
+
+ACCESS
+------
+Caller must be a member of the privileged 'Administrators' group.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+OPTIONS
+-------
+
+`--remote <PATTERN>`
+:	Only print information for sources whose remote name matches
+	the `PATTERN`.
+
+`--detail`
+:	Print additional detailed information: AdminUrl, AuthGroup, Project
+	and queue (pending and in-flight).
+
+`--json`
+:	Output in json format.
+
+EXAMPLES
+--------
+List all sources:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list
+```
+
+List all sources detail information:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --detail
+```
+
+List all sources detail information in json format:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --detail --json
+```
+
+List sources whose name contains mirror:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --remote mirror
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ list --remote ^.*mirror.*
+```
+
+SEE ALSO
+--------
+
+* [Replication Configuration](config.md)
+* [Access Control](../../../Documentation/access-control.html)
diff --git a/src/main/resources/Documentation/cmd-start.md b/src/main/resources/Documentation/cmd-start.md
new file mode 100644
index 0000000..51d0a53
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-start.md
@@ -0,0 +1,100 @@
+@PLUGIN@ start
+==============
+
+NAME
+----
+@PLUGIN@ start - Manually trigger replication, to recover a node
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start
+  [--now]
+  [--wait]
+  [--url <PATTERN>]
+  {--all | <PROJECT PATTERN> ...}
+```
+
+DESCRIPTION
+-----------
+Schedules pull replication of the specified projects to all configured
+replication sources, or only those whose URLs match the pattern
+given on the command line.
+
+If you get message "Nothing to replicate" while running this command,
+it may be caused by several reasons, such as you give a wrong url
+pattern in command options, or the authGroup in the @PLUGIN@.config
+has no read access for the replicated projects.
+
+If one or several project patterns are supplied, only those projects
+conforming to both this/these pattern(s) and those defined in
+@PLUGIN@.config for the target host(s) are queued for replication.
+
+The patterns follow the same format as those in @PLUGIN@.config,
+where wildcard or regular expression patterns can be given.
+Regular expression patterns must match a complete project name to be
+considered a match.
+
+A regular expression pattern starts with `^` and a wildcard pattern ends
+with a `*`. If the pattern starts with `^` and ends with `*`, it is
+treated as a regular expression.
+
+ACCESS
+------
+Caller must be a member of the privileged 'Administrators' group,
+or have been granted the 'Start Replication' plugin-owned capability.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+OPTIONS
+-------
+
+`--now`
+:   Start replicating right away without waiting the per remote
+	replication delay.
+
+`--wait`
+:	Wait for replication to finish before exiting.
+
+`--all`
+:	Schedule replication for all projects.
+
+`--url <PATTERN>`
+:	Replicate only from replication sources whose URL contains
+	the substring `PATTERN`.  This can be useful to replicate
+	only from a previously down node, which has been brought back
+	online.
+
+EXAMPLES
+--------
+Replicate every project, from every configured remote:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --all
+```
+
+Replicate only from `srv2` now that it is back online:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url srv2 --all
+```
+
+Replicate only projects located in the `documentation` subdirectory:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start documentation/*
+```
+
+Replicate projects whose path includes a folder named `vendor` to host slave1:
+
+```
+  $ ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ start --url slave1 ^(|.*/)vendor(|/.*)
+```
+
+SEE ALSO
+--------
+
+* [Replication Configuration](config.md)
+* [Access Control](../../../Documentation/access-control.html)
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
new file mode 100644
index 0000000..aa59a2d
--- /dev/null
+++ b/src/main/resources/Documentation/config.md
@@ -0,0 +1,329 @@
+@PLUGIN@ Configuration
+=========================
+
+Enabling Replication
+--------------------
+
+If replicating over SSH, ensure the host key of the
+remote system(s) is already in the Gerrit user's `~/.ssh/known_hosts`
+file.  The easiest way to add the host key is to connect once by hand
+with the command line:
+
+```
+  sudo su -c 'ssh mirror1.us.some.org echo' gerrit2
+```
+
+<a name="example_file">
+Next, create `$site_path/etc/@PLUGIN@.config` as a Git-style config
+file, for example to replicate in parallel from four different hosts:</a>
+
+```
+  [remote "host-one"]
+    url = gerrit2@host-one.example.com:/some/path/${name}.git
+
+  [remote "pubmirror"]
+    url = mirror1.us.some.org:/pub/git/${name}.git
+    url = mirror2.us.some.org:/pub/git/${name}.git
+    url = mirror3.us.some.org:/pub/git/${name}.git
+    fetch = +refs/heads/*:refs/heads/*
+    fetch = +refs/tags/*:refs/tags/*
+    threads = 3
+    authGroup = Public Mirror Group
+    authGroup = Second Public Mirror Group
+```
+
+Then reload the replication plugin to pick up the new configuration:
+
+```
+  ssh -p 29418 localhost gerrit plugin reload @PLUGIN@
+```
+
+To manually trigger replication at runtime, see
+SSH command [start](cmd-start.md).
+
+File `@PLUGIN@.config`
+-------------------------
+
+The optional file `$site_path/etc/@PLUGIN@.config` is a Git-style
+config file that controls the replication settings for the replication
+plugin.
+
+The file is composed of one or more `remote` sections, each remote
+section provides common configuration settings for one or more
+source URLs.
+
+Each remote section uses its own thread pool.  If fetching from
+multiple remotes, over differing types of network connections
+(e.g. LAN and also public Internet), its a good idea to put them
+into different remote sections, so that replication to the slower
+connection does not starve out the faster local one.  The example
+file above does this.
+
+In the keys below, the `NAME` portion is unused by this plugin, but
+must be unique to distinguish the different sections if more than one
+remote section appears in the file.
+
+gerrit.replicateOnStartup
+:	If true, replicates from all remotes on startup to ensure they
+	are in-sync with this server.  By default, false.
+
+gerrit.autoReload
+:	If true, automatically reloads replication sources and settings
+	after `replication.config` file is updated, without the need to restart
+	the replication plugin. When the reload takes place, pending replication
+	events based on old settings are discarded. By default, false.
+
+replication.lockErrorMaxRetries
+:	Number of times to retry a replication operation if a lock
+	error is detected.
+
+	If two or more replication operations (to the same GIT and Ref)
+	are scheduled at approximately the same time (and end up on different
+	replication threads), there is a large probability that the last
+	fetch to complete will fail with a remote "failure to lock" error.
+	This option allows Gerrit to retry the replication fetch when the
+	"failure to lock" error is detected.
+
+	A good value would be 3 retries or less, depending on how often
+	you see lockError collisions in your server logs. A too highly set
+	value risks keeping around the replication operations in the queue
+	for a long time, and the number of items in the queue will increase
+	with time.
+
+	Normally Gerrit will succeed with the replication during its first
+	retry, but in certain edge cases (e.g. a mirror introduces a ref
+	namespace with the same name as a branch on the master) the retry
+	will never succeed.
+
+	The issue can also be mitigated somewhat by increasing the
+	replicationDelay.
+
+	Default: 0 (disabled, i.e. never retry)
+
+replication.maxRetries
+:	Maximum number of times to retry a fetch operation that previously
+	failed.
+
+	When a fetch operation reaches its maximum number of retries,
+	the replication event is discarded from the queue.
+
+	Can be overridden at remote-level by setting replicationMaxRetries.
+
+	By default, fetchs are retried indefinitely.
+
+remote.NAME.url
+:	Address of the remote server to fetch from.  Multiple URLs may be
+	specified within a single remote block, listing different
+	destinations which share the same settings.  Assuming
+	sufficient threads in the thread pool, Gerrit fetchs from all
+	URLs in parallel, using one thread per URL.
+
+    TODO: This doesn't apply here: you can only replicate from one url
+
+	Within each URL value the magic placeholder `${name}` is
+	replaced with the Gerrit project name.  This is a Gerrit
+	specific extension to the otherwise standard Git URL syntax
+	and it must be included in each URL so that Gerrit can figure
+	out where each project needs to be replicated. `${name}` may
+	only be omitted if the remote refers to a single repository
+	(i.e.: Exactly one [remote.NAME.projects][3] and that name's
+	value is a single project match.).
+
+	See [git fetch][1] for details on Git URL syntax.
+
+[1]: http://www.git-scm.com/docs/git-fetch#URLS
+[3]: #remote.NAME.projects
+
+remote.NAME.uploadpack
+:	Path of the `git-upload-pack` executable on the remote system,
+	if using the SSH transport.
+
+	Defaults to `git-upload-pack`.
+
+remote.NAME.fetch
+:	Standard Git refspec denoting what should be replicated.
+	Setting this to `+refs/heads/*:refs/heads/*` would mirror only
+	the active branches, but not the change refs under
+	`refs/changes/`, or the tags under `refs/tags/`.
+
+	Multiple fetch keys can be supplied, to specify multiple
+	patterns to match against.  In the [example above][2], remote
+	"pubmirror" uses two fetch keys to match both `refs/heads/*`
+	and `refs/tags/*`, but excludes all others, including
+	`refs/changes/*`.
+
+	Note that the `refs/meta/config` branch is only replicated
+	when `replicatePermissions` is true, even if the push refspec
+	is 'all refs'.
+
+[2]: #example_file
+
+remote.NAME.timeout
+:	Number of seconds to wait for a network read or write to
+	complete before giving up and declaring the remote side is not
+	responding.  If 0, there is no timeout, and the push client
+	waits indefinitely.
+
+	A timeout should be large enough to mostly transfer the
+	objects from the other side.  1 second may be too small for
+	larger projects, especially over a WAN link, while 10-30
+	seconds is a much more reasonable timeout value.
+
+	Defaults to 0 seconds, wait indefinitely.
+
+remote.NAME.rescheduleDelay
+:	Delay when rescheduling a fetch operation due to an in-flight fetch
+	running for the same project.
+
+	Cannot be set to a value lower than 3 seconds to avoid a tight loop
+	of schedule/run which could cause 1K+ retries per second.
+
+	A configured value lower than 3 seconds will be rounded to 3 seconds.
+
+	By default, 3 seconds.
+
+remote.NAME.replicationRetry
+:	Time to wait before scheduling a remote fetch operation previously
+	failed due to a remote server error.
+
+	If a remote fetch operation fails because a remote server was
+	offline, all fetch operations from the same source URL are
+	blocked, and the remote fetch is continuously retried unless
+	the replicationMaxRetries value is set.
+
+	This is a Gerrit specific extension to the Git remote block.
+
+	By default, 1 minute.
+
+remote.NAME.replicationMaxRetries
+:	Maximum number of times to retry a fetch operation that previously
+	failed.
+
+	When a fetch operation reaches its maximum number of retries
+	the replication event is discarded from the queue.
+
+	This is a Gerrit specific extension to the Git remote block.
+
+	By default, use replication.maxRetries.
+
+remote.NAME.threads
+:	Number of worker threads to dedicate to fetching to the
+	repositories described by this remote.  Each thread can fetch
+	one project at a time, from one source URL.  Scheduling
+	within the thread pool is done on a per-project basis.  If a
+	remote block describes 4 URLs, allocating 4 threads in the
+	pool will permit some level of parallel fetching.
+
+	By default, 1 thread.
+
+remote.NAME.authGroup
+:	Specifies the name of a group that the remote should use to
+	access the repositories. Multiple authGroups may be specified
+	within a single remote block to signify a wider access right.
+	In the project administration web interface the read access
+	can be specified for this group to control if a project should
+	be replicated or not to the remote.
+
+	By default, replicates without group control, i.e. replicates
+	everything from all remotes.
+
+remote.NAME.remoteNameStyle
+:	Provides possibilities to influence the name of the source
+	repository, e.g. by replacing slashes in the `${name}`
+	placeholder.
+
+	Github and Gitorious do not permit slashes "/" in repository
+	names and will change them to dashes "-" at repository creation
+	time.
+
+	If this setting is set to "dash", slashes will be replaced with
+	dashes in the remote repository name. If set to "underscore",
+	slashes will be replaced with underscores in the repository name.
+
+	Option "basenameOnly" makes `${name}` to be only the basename
+	(the part after the last slash) of the repository path on the
+	Gerrit server, e.g. `${name}` of `foo/bar/my-repo.git` would
+	be `my-repo`.
+
+	By default, "slash", i.e. remote names will contain slashes as
+	they do in Gerrit.
+
+<a name="remote.NAME.projects">remote.NAME.projects</a>
+:	Specifies which repositories should be replicated from the
+	remote. It can be provided more than once, and supports three
+	formats: regular expressions, wildcard matching, and single
+	project matching. All three formats match case-sensitive.
+
+	Values starting with a caret `^` are treated as regular
+	expressions. `^foo/(bar|baz)` would match the projects
+	`foo/bar`, and `foo/baz`. Regular expressions have to fully
+	match the project name. So the above example would not match
+	`foo/bar2`, while `^foo/(bar|baz).*` would.
+
+	Projects may be excluded from replication by using a regular
+	expression with inverse match. `^(?:(?!PATTERN).)*$` will
+	exclude any project that matches.
+
+	Values that are not regular expressions and end in `*` are
+	treated as wildcard matches. Wildcards match projects whose
+	name agrees from the beginning until the trailing `*`. So
+	`foo/b*` would match the projects `foo/b`, `foo/bar`, and
+	`foo/baz`, but neither `foobar`, nor `bar/foo/baz`.
+
+	Values that are neither regular expressions nor wildcards are
+	treated as single project matches. So `foo/bar` matches only
+	the project `foo/bar`, but no other project.
+
+	By default, replicates without matching, i.e. replicates
+	everything from all remotes.
+
+File `secure.config`
+--------------------
+
+The optional file `$site_path/secure.config` is a Git-style config
+file that provides secure values that should not be world-readable,
+such as passwords. Passwords for HTTP remotes can be obtained from
+this file.
+
+remote.NAME.username
+:	Username to use for HTTP authentication on this remote, if not
+	given in the URL.
+
+remote.NAME.password
+:	Password to use for HTTP authentication on this remote.
+
+File `~/.ssh/config`
+--------------------
+
+If present, Gerrit reads and caches `~/.ssh/config` at startup, and
+supports most SSH configuration options.  For example:
+
+```
+  Host host-one.example.com
+    IdentityFile ~/.ssh/id_hostone
+    PreferredAuthentications publickey
+
+  Host mirror*.us.some.org
+    User mirror-updater
+    IdentityFile ~/.ssh/id_pubmirror
+    PreferredAuthentications publickey
+```
+
+Supported options:
+
+  * Host
+  * Hostname
+  * User
+  * Port
+  * IdentityFile
+  * PreferredAuthentications
+  * StrictHostKeyChecking
+
+SSH authentication must be by passwordless public key, as there is no
+facility to read passphrases on startup or passwords during the SSH
+connection setup, and SSH agents are not supported from Java.
+
+Host keys for any destination SSH servers must appear in the user's
+`~/.ssh/known_hosts` file, and must be added in advance, before Gerrit
+starts.  If a host key is not listed, Gerrit will be unable to connect
+to that destination, and replication to that URL will fail.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
new file mode 100644
index 0000000..f0f1c54
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchGitUpdateProcessingTest.java
@@ -0,0 +1,88 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.gerrit.server.events.EventDispatcher;
+import com.google.gerrit.server.permissions.PermissionBackendException;
+import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing.GitUpdateProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState.RefFetchResult;
+import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FetchGitUpdateProcessingTest {
+  private EventDispatcher dispatcherMock;
+  private GitUpdateProcessing gitUpdateProcessing;
+
+  @Before
+  public void setUp() throws Exception {
+    dispatcherMock = mock(EventDispatcher.class);
+    gitUpdateProcessing = new GitUpdateProcessing(dispatcherMock);
+  }
+
+  @Test
+  public void headRefReplicated() throws URISyntaxException, PermissionBackendException {
+    FetchRefReplicatedEvent expectedEvent =
+        new FetchRefReplicatedEvent(
+            "someProject",
+            "refs/heads/master",
+            "someHost",
+            RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+
+    gitUpdateProcessing.onOneProjectReplicationDone(
+        "someProject",
+        "refs/heads/master",
+        new URIish("git://someHost/someProject.git"),
+        RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
+  }
+
+  @Test
+  public void changeRefReplicated() throws URISyntaxException, PermissionBackendException {
+    FetchRefReplicatedEvent expectedEvent =
+        new FetchRefReplicatedEvent(
+            "someProject",
+            "refs/changes/01/1/1",
+            "someHost",
+            RefFetchResult.FAILED,
+            RefUpdate.Result.REJECTED_OTHER_REASON);
+
+    gitUpdateProcessing.onOneProjectReplicationDone(
+        "someProject",
+        "refs/changes/01/1/1",
+        new URIish("git://someHost/someProject.git"),
+        RefFetchResult.FAILED,
+        RefUpdate.Result.REJECTED_OTHER_REASON);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedEvent));
+  }
+
+  @Test
+  public void onAllNodesReplicated() throws PermissionBackendException {
+    FetchRefReplicationDoneEvent expectedDoneEvent =
+        new FetchRefReplicationDoneEvent("someProject", "refs/heads/master", 5);
+
+    gitUpdateProcessing.onRefReplicatedFromAllNodes("someProject", "refs/heads/master", 5);
+    verify(dispatcherMock, times(1)).postEvent(eq(expectedDoneEvent));
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
new file mode 100644
index 0000000..c1c0981
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/FetchReplicationTest.java
@@ -0,0 +1,46 @@
+// Copyright (C) 2011 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.googlesource.gerrit.plugins.replication.pull.Source.encode;
+import static com.googlesource.gerrit.plugins.replication.pull.Source.needsUrlEncoding;
+
+import java.net.URISyntaxException;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Test;
+
+public class FetchReplicationTest {
+
+  @Test
+  public void testNeedsUrlEncoding() throws URISyntaxException {
+    assertThat(needsUrlEncoding(new URIish("http://host/path"))).isTrue();
+    assertThat(needsUrlEncoding(new URIish("https://host/path"))).isTrue();
+    assertThat(needsUrlEncoding(new URIish("amazon-s3://config/bucket/path"))).isTrue();
+
+    assertThat(needsUrlEncoding(new URIish("host:path"))).isFalse();
+    assertThat(needsUrlEncoding(new URIish("user@host:path"))).isFalse();
+    assertThat(needsUrlEncoding(new URIish("git://host/path"))).isFalse();
+    assertThat(needsUrlEncoding(new URIish("ssh://host/path"))).isFalse();
+  }
+
+  @Test
+  public void urlEncoding() {
+    assertThat(encode("foo/bar/thing")).isEqualTo("foo/bar/thing");
+    assertThat(encode("-- All Projects --")).isEqualTo("--%20All%20Projects%20--");
+    assertThat(encode("name/with a space")).isEqualTo("name/with%20a%20space");
+    assertThat(encode("name\nwith-LF")).isEqualTo("name%0Awith-LF");
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java
new file mode 100644
index 0000000..eb52493
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationStateTest.java
@@ -0,0 +1,222 @@
+// Copyright (C) 2013 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.net.URISyntaxException;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationStateTest {
+
+  private ReplicationState replicationState;
+  private FetchResultProcessing fetchResultProcessingMock;
+
+  @Before
+  public void setUp() throws Exception {
+    fetchResultProcessingMock = mock(FetchResultProcessing.class);
+    replicationState = new ReplicationState(fetchResultProcessingMock);
+  }
+
+  @Test
+  public void shouldNotHavePushTask() {
+    assertThat(replicationState.hasFetchTask()).isFalse();
+  }
+
+  @Test
+  public void shouldHavePushTask() {
+    replicationState.increaseFetchTaskCount("someProject", "someRef");
+    assertThat(replicationState.hasFetchTask()).isTrue();
+  }
+
+  @Test
+  public void shouldFireOneReplicationEventWhenNothingToReplicate() {
+    // actual test
+    replicationState.markAllFetchTasksScheduled();
+
+    // expected event
+    verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(0);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationOfOneRefToOneNode() throws URISyntaxException {
+    URIish uri = new URIish("git://someHost/someRepo.git");
+
+    // actual test
+    replicationState.increaseFetchTaskCount("someProject", "someRef");
+    replicationState.markAllFetchTasksScheduled();
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "someRef",
+        uri,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+
+    // expected events
+    fetchResultProcessingMock.onOneProjectReplicationDone(
+        "someProject",
+        "someRef",
+        uri,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+
+    verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "someRef", 1);
+    verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(1);
+  }
+
+  @Test
+  public void shouldFireEventsForReplicationOfMultipleRefsToMultipleNodes()
+      throws URISyntaxException {
+    URIish uri1 = new URIish("git://host1/someRepo.git");
+    URIish uri2 = new URIish("git://host2/someRepo.git");
+    URIish uri3 = new URIish("git://host3/someRepo.git");
+
+    // actual test
+    replicationState.increaseFetchTaskCount("someProject", "ref1");
+    replicationState.increaseFetchTaskCount("someProject", "ref1");
+    replicationState.increaseFetchTaskCount("someProject", "ref1");
+    replicationState.increaseFetchTaskCount("someProject", "ref2");
+    replicationState.increaseFetchTaskCount("someProject", "ref2");
+    replicationState.markAllFetchTasksScheduled();
+
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref1",
+        uri1,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref1",
+        uri2,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref1",
+        uri3,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref2",
+        uri1,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref2",
+        uri2,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+
+    // expected events
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref1",
+            uri1,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref1",
+            uri2,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref1",
+            uri3,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref2",
+            uri1,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref2",
+            uri2,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+
+    verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref1", 3);
+    verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref2", 2);
+    verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(5);
+  }
+
+  @Test
+  public void shouldFireEventsWhenSomeReplicationCompleteBeforeAllTasksAreScheduled()
+      throws URISyntaxException {
+    URIish uri1 = new URIish("git://host1/someRepo.git");
+
+    // actual test
+    replicationState.increaseFetchTaskCount("someProject", "ref1");
+    replicationState.increaseFetchTaskCount("someProject", "ref2");
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref1",
+        uri1,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    replicationState.notifyRefReplicated(
+        "someProject",
+        "ref2",
+        uri1,
+        ReplicationState.RefFetchResult.SUCCEEDED,
+        RefUpdate.Result.NEW);
+    replicationState.markAllFetchTasksScheduled();
+
+    // expected events
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref1",
+            uri1,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock)
+        .onOneProjectReplicationDone(
+            "someProject",
+            "ref2",
+            uri1,
+            ReplicationState.RefFetchResult.SUCCEEDED,
+            RefUpdate.Result.NEW);
+    verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref1", 1);
+    verify(fetchResultProcessingMock).onRefReplicatedFromAllNodes("someProject", "ref2", 1);
+    verify(fetchResultProcessingMock).onAllRefsReplicatedFromAllNodes(2);
+  }
+
+  @Test
+  public void toStringRefPushResult() throws Exception {
+    assertEquals("failed", ReplicationState.RefFetchResult.FAILED.toString());
+    assertEquals("not-attempted", ReplicationState.RefFetchResult.NOT_ATTEMPTED.toString());
+    assertEquals("succeeded", ReplicationState.RefFetchResult.SUCCEEDED.toString());
+  }
+}