RestApi service to fetch based on object id

Pull replication rest api allows to notify service to fetch from remote
specific ref update by object id. This will allow to implement pull
replication functionality.

Feature: Issue 11605
Change-Id: I81793e6aeb6e5ffc4679ace7b282948d9720f30f
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Command.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Command.java
new file mode 100644
index 0000000..19069da
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Command.java
@@ -0,0 +1,7 @@
+package com.googlesource.gerrit.plugins.replication.pull;
+
+public interface Command {
+  public void writeStdOutSync(String message);
+
+  public void writeStdErrSync(String message);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
index 7ee0da1..4c6edb0 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchOne.java
@@ -62,7 +62,7 @@
  * <p>Instance members are protected by the lock within FetchQueue. Callers must take that lock to
  * ensure they are working with a current view of the object.
  */
-class FetchOne implements ProjectRunnable, CanceledWhileRunning {
+public class FetchOne implements ProjectRunnable, CanceledWhileRunning {
   private final ReplicationStateListener stateLog;
   static final String ALL_REFS = "..all..";
   static final String ID_MDC_KEY = "fetchOneId";
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
index 202d7e6..cffc1f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/FetchResultProcessing.java
@@ -69,10 +69,10 @@
   }
 
   public static class CommandProcessing extends FetchResultProcessing {
-    private WeakReference<StartFetchCommand> sshCommand;
+    private WeakReference<Command> sshCommand;
     private AtomicBoolean hasError = new AtomicBoolean();
 
-    CommandProcessing(StartFetchCommand sshCommand) {
+    public CommandProcessing(Command sshCommand) {
       this.sshCommand = new WeakReference<>(sshCommand);
     }
 
@@ -106,9 +106,11 @@
           sb.append("UNKNOWN RESULT!");
           break;
       }
-      sb.append(" (");
-      sb.append(refUpdateResult.toString());
-      sb.append(")");
+      if (refUpdateResult != null) {
+        sb.append(" (");
+        sb.append(refUpdateResult.toString());
+        sb.append(")");
+      }
       writeStdOut(sb.toString());
     }
 
@@ -140,7 +142,7 @@
 
     @Override
     void writeStdOut(String message) {
-      StartFetchCommand command = sshCommand.get();
+      Command command = sshCommand.get();
       if (command != null) {
         command.writeStdOutSync(message);
       }
@@ -148,7 +150,7 @@
 
     @Override
     void writeStdErr(String message) {
-      StartFetchCommand command = sshCommand.get();
+      Command command = sshCommand.get();
       if (command != null) {
         command.writeStdErrSync(message);
       }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
index 236c8b6..6eb1d30 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationModule.java
@@ -37,6 +37,7 @@
 import com.googlesource.gerrit.plugins.replication.ReplicationConfigValidator;
 import com.googlesource.gerrit.plugins.replication.ReplicationFileBasedConfig;
 import com.googlesource.gerrit.plugins.replication.StartReplicationCapability;
+import com.googlesource.gerrit.plugins.replication.pull.api.PullReplicationApiModule;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
@@ -54,6 +55,8 @@
 
   @Override
   protected void configure() {
+
+    install(new PullReplicationApiModule());
     install(new FactoryModuleBuilder().build(Source.Factory.class));
     bind(FetchReplicationMetrics.class).in(Scopes.SINGLETON);
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
index 7e60323..a62f369 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/PullReplicationStateLogger.java
@@ -26,7 +26,7 @@
  * state to the stderr console.
  */
 @Singleton
-class PullReplicationStateLogger implements ReplicationStateListener {
+public class PullReplicationStateLogger implements ReplicationStateListener {
 
   @Override
   public void warn(String msg, ReplicationState... states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
index 5044496..ed4751f 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/ReplicationState.java
@@ -19,6 +19,7 @@
 import com.google.inject.assistedinject.Assisted;
 import com.google.inject.assistedinject.AssistedInject;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.eclipse.jgit.lib.RefUpdate;
@@ -158,6 +159,10 @@
     allFetchTasksFinished.await();
   }
 
+  public void waitForReplication(long timeout) throws InterruptedException {
+    allFetchTasksFinished.await(timeout, TimeUnit.SECONDS);
+  }
+
   public void writeStdOut(String message) {
     fetchResultProcessing.writeStdOut(message);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
index dbe8459..a2c70df 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/Source.java
@@ -64,8 +64,12 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FilenameUtils;
@@ -85,7 +89,7 @@
   }
 
   private final ReplicationStateListener stateLog;
-  private final Object stateLock = new Object();
+  private final Map<Project.NameKey, Object> stateLock = new ConcurrentHashMap<>();
   private final Map<URIish, FetchOne> pending = new HashMap<>();
   private final Map<URIish, FetchOne> inFlight = new HashMap<>();
   private final FetchOne.Factory opFactory;
@@ -319,20 +323,23 @@
     return false;
   }
 
-  void schedule(Project.NameKey project, String ref, URIish uri, ReplicationState state) {
-    schedule(project, ref, uri, state, false);
+  public Future<?> schedule(
+      Project.NameKey project, String ref, ReplicationState state, boolean now) {
+    URIish uri = getURI(project);
+    return schedule(project, ref, uri, state, now);
   }
 
-  void schedule(
+  public Future<?> schedule(
       Project.NameKey project, String ref, URIish uri, ReplicationState state, boolean now) {
+
     repLog.info("scheduling replication {}:{} => {}", uri, ref, project);
     if (!shouldReplicate(project, ref, state)) {
-      return;
+      return CompletableFuture.completedFuture(null);
     }
 
     if (!config.replicatePermissions()) {
       FetchOne e;
-      synchronized (stateLock) {
+      synchronized (stateLock.getOrDefault(project, new Object())) {
         e = pending.get(uri);
       }
       if (e == null) {
@@ -342,26 +349,27 @@
             if (head != null
                 && head.isSymbolic()
                 && RefNames.REFS_CONFIG.equals(head.getLeaf().getName())) {
-              return;
+              return CompletableFuture.completedFuture(null);
             }
           } catch (IOException err) {
             stateLog.error(String.format("cannot check type of project %s", project), err, state);
-            return;
+            return CompletableFuture.completedFuture(null);
           }
         } catch (IOException err) {
           stateLog.error(String.format("source project %s not available", project), err, state);
-          return;
+          return CompletableFuture.completedFuture(null);
         }
       }
     }
 
-    synchronized (stateLock) {
+    synchronized (stateLock.getOrDefault(project, new Object())) {
       FetchOne e = pending.get(uri);
+      Future<?> f = CompletableFuture.completedFuture(null);
       if (e == null) {
         e = opFactory.create(project, uri);
         addRef(e, ref);
         e.addState(ref, state);
-        pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        f = pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
         pending.put(uri, e);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
@@ -369,11 +377,12 @@
       }
       state.increaseFetchTaskCount(project.get(), ref);
       repLog.info("scheduled {}:{} => {} to run after {}s", e, ref, project, config.getDelay());
+      return f;
     }
   }
 
   void fetchWasCanceled(FetchOne fetchOp) {
-    synchronized (stateLock) {
+    synchronized (stateLock.getOrDefault(fetchOp.getProjectNameKey(), new Object())) {
       URIish uri = fetchOp.getURI();
       pending.remove(uri);
     }
@@ -522,30 +531,54 @@
     List<URIish> r = Lists.newArrayListWithCapacity(config.getRemoteConfig().getURIs().size());
     for (URIish uri : config.getRemoteConfig().getURIs()) {
       if (matches(uri, urlMatch)) {
-        String name = project.get();
-        if (needsUrlEncoding(uri)) {
-          name = encode(name);
-        }
-        String remoteNameStyle = config.getRemoteNameStyle();
-        if (remoteNameStyle.equals("dash")) {
-          name = name.replace("/", "-");
-        } else if (remoteNameStyle.equals("underscore")) {
-          name = name.replace("/", "_");
-        } else if (remoteNameStyle.equals("basenameOnly")) {
-          name = FilenameUtils.getBaseName(name);
-        } else if (!remoteNameStyle.equals("slash")) {
-          repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
-        }
-        String replacedPath = replaceName(uri.getPath(), name, isSingleProjectMatch());
-        if (replacedPath != null) {
-          uri = uri.setPath(replacedPath);
-          r.add(uri);
-        }
+        Optional<String> replacedPath = convertToPath(project, uri);
+        replacedPath.ifPresent(
+            path -> {
+              r.add(uri.setPath(path));
+            });
       }
     }
     return r;
   }
 
+  public URIish getURI(Project.NameKey project) {
+    if (config.getRemoteConfig().getURIs().size() != 1) {
+      throw new IllegalStateException(
+          String.format(
+              "Pull replication source %s must have only one url property.", project.get()));
+    }
+
+    URIish uri = config.getRemoteConfig().getURIs().get(0);
+    Optional<String> replacedPathOpt = convertToPath(project, uri);
+    String replacedPath =
+        replacedPathOpt.orElseThrow(
+            () ->
+                new IllegalStateException(
+                    String.format(
+                        "Remote config %s url %s does not contain ${name} field",
+                        config.getRemoteConfig().getName(), uri.getPath())));
+
+    return uri.setPath(replacedPath);
+  }
+
+  private Optional<String> convertToPath(Project.NameKey project, URIish uri) {
+    String name = project.get();
+    if (needsUrlEncoding(uri)) {
+      name = encode(name);
+    }
+    String remoteNameStyle = config.getRemoteNameStyle();
+    if (remoteNameStyle.equals("dash")) {
+      name = name.replace("/", "-");
+    } else if (remoteNameStyle.equals("underscore")) {
+      name = name.replace("/", "_");
+    } else if (remoteNameStyle.equals("basenameOnly")) {
+      name = FilenameUtils.getBaseName(name);
+    } else if (!remoteNameStyle.equals("slash")) {
+      repLog.debug("Unknown remoteNameStyle: {}, falling back to slash", remoteNameStyle);
+    }
+    return Optional.ofNullable(replaceName(uri.getPath(), name, isSingleProjectMatch()));
+  }
+
   static boolean needsUrlEncoding(URIish uri) {
     return "http".equalsIgnoreCase(uri.getScheme())
         || "https".equalsIgnoreCase(uri.getScheme())
@@ -585,10 +618,14 @@
     return config.getLockErrorMaxRetries();
   }
 
-  String getRemoteConfigName() {
+  public String getRemoteConfigName() {
     return config.getRemoteConfig().getName();
   }
 
+  public int getTimeout() {
+    return config.getRemoteConfig().getTimeout();
+  }
+
   public int getMaxRetries() {
     return config.getMaxRetries();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
index bec0ffa..a3497e6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/StartFetchCommand.java
@@ -32,7 +32,7 @@
 @CommandMetaData(
     name = "start",
     description = "Start replication for specific project or all projects")
-public final class StartFetchCommand extends SshCommand {
+public final class StartFetchCommand extends SshCommand implements Command {
   @Inject private PullReplicationStateLogger fetchStateLog;
 
   @Option(name = "--all", usage = "fetch all known projects")
@@ -100,6 +100,7 @@
     }
   }
 
+  @Override
   public void writeStdOutSync(String message) {
     if (wait) {
       synchronized (stdout) {
@@ -109,6 +110,7 @@
     }
   }
 
+  @Override
   public void writeStdErrSync(String message) {
     if (wait) {
       synchronized (stderr) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
new file mode 100644
index 0000000..ce02d56
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchAction.java
@@ -0,0 +1,133 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.Strings;
+import com.google.common.flogger.FluentLogger;
+import com.google.gerrit.entities.Project;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.RestModifyView;
+import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.config.UrlFormatter;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.ioutil.HexFormat;
+import com.google.gerrit.server.project.ProjectResource;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.api.FetchAction.Input;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class FetchAction implements RestModifyView<ProjectResource, Input> {
+  private final FetchCommand command;
+  private final WorkQueue workQueue;
+  private final DynamicItem<UrlFormatter> urlFormatter;
+
+  @Inject
+  public FetchAction(
+      FetchCommand command, WorkQueue workQueue, DynamicItem<UrlFormatter> urlFormatter) {
+    this.command = command;
+    this.workQueue = workQueue;
+    this.urlFormatter = urlFormatter;
+  }
+
+  public static class Input {
+    public String label;
+    public String objectId;
+    public boolean async;
+  }
+
+  @Override
+  public Response<?> apply(ProjectResource resource, Input input) throws RestApiException {
+    try {
+      if (Strings.isNullOrEmpty(input.label)) {
+        throw new BadRequestException("Source label cannot be null or empty");
+      }
+
+      if (Strings.isNullOrEmpty(input.objectId)) {
+        throw new BadRequestException("Ref-update objectId cannot be null or empty");
+      }
+
+      if (input.async) {
+        return applyAsync(resource.getNameKey(), input);
+      }
+      return applySync(resource.getNameKey(), input);
+    } catch (InterruptedException
+        | ExecutionException
+        | IllegalStateException
+        | TimeoutException e) {
+      throw new RestApiException(e.getMessage(), e);
+    } catch (RemoteConfigurationMissingException e) {
+      throw new UnprocessableEntityException(e.getMessage());
+    }
+  }
+
+  private Response<?> applySync(Project.NameKey project, Input input)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    command.fetch(project, input.label, input.objectId);
+    return Response.created(input);
+  }
+
+  private Response.Accepted applyAsync(Project.NameKey project, Input input) {
+    @SuppressWarnings("unchecked")
+    WorkQueue.Task<Void> task =
+        (WorkQueue.Task<Void>)
+            workQueue.getDefaultQueue().submit(new FetchJob(command, project, input));
+    Optional<String> url =
+        urlFormatter
+            .get()
+            .getRestUrl("a/config/server/tasks/" + HexFormat.fromInt(task.getTaskId()));
+    // We're in a HTTP handler, so must be present.
+    checkState(url.isPresent());
+    return Response.accepted(url.get());
+  }
+
+  private static class FetchJob implements Runnable {
+    private static final FluentLogger log = FluentLogger.forEnclosingClass();
+
+    private FetchCommand command;
+    private Project.NameKey project;
+    private FetchAction.Input input;
+
+    public FetchJob(FetchCommand command, Project.NameKey project, FetchAction.Input input) {
+      this.command = command;
+      this.project = project;
+      this.input = input;
+    }
+
+    @Override
+    public void run() {
+      try {
+        command.fetch(project, input.label, input.objectId);
+      } catch (InterruptedException
+          | ExecutionException
+          | RemoteConfigurationMissingException
+          | TimeoutException e) {
+        log.atSevere().withCause(e).log(
+            "Exception during the async fetch call for project {}, label {} and object id {}",
+            project.get(),
+            input.label,
+            input.objectId);
+      }
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
new file mode 100644
index 0000000..749adc2
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommand.java
@@ -0,0 +1,86 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import com.google.gerrit.entities.Project;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.replication.pull.Command;
+import com.googlesource.gerrit.plugins.replication.pull.FetchResultProcessing;
+import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class FetchCommand implements Command {
+
+  private ReplicationState.Factory fetchReplicationStateFactory;
+  private PullReplicationStateLogger fetchStateLog;
+  private SourcesCollection sources;
+
+  @Inject
+  public FetchCommand(
+      ReplicationState.Factory fetchReplicationStateFactory,
+      PullReplicationStateLogger fetchStateLog,
+      SourcesCollection sources) {
+    this.fetchReplicationStateFactory = fetchReplicationStateFactory;
+    this.fetchStateLog = fetchStateLog;
+    this.sources = sources;
+  }
+
+  public void fetch(Project.NameKey name, String label, String objectId)
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    ReplicationState state =
+        fetchReplicationStateFactory.create(new FetchResultProcessing.CommandProcessing(this));
+    Optional<Source> source =
+        sources.getAll().stream().filter(s -> s.getRemoteConfigName().equals(label)).findFirst();
+    if (!source.isPresent()) {
+      String msg = String.format("Remote configuration section %s not found", label);
+      fetchStateLog.error(msg, state);
+      throw new RemoteConfigurationMissingException(msg);
+    }
+
+    try {
+      Future<?> future = source.get().schedule(name, objectId, state, true);
+      state.markAllFetchTasksScheduled();
+      future.get(source.get().getTimeout(), TimeUnit.SECONDS);
+    } catch (ExecutionException
+        | IllegalStateException
+        | TimeoutException
+        | InterruptedException e) {
+      fetchStateLog.error("Exception during the fetch operation", e, state);
+      throw e;
+    }
+
+    try {
+      state.waitForReplication(source.get().getTimeout());
+    } catch (InterruptedException e) {
+      writeStdErrSync("We are interrupted while waiting replication to complete");
+      throw e;
+    }
+  }
+
+  @Override
+  public void writeStdOutSync(String message) {}
+
+  @Override
+  public void writeStdErrSync(String message) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
new file mode 100644
index 0000000..470c985
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/PullReplicationApiModule.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.gerrit.server.project.ProjectResource.PROJECT_KIND;
+
+import com.google.gerrit.extensions.restapi.RestApiModule;
+import com.google.inject.Scopes;
+
+public class PullReplicationApiModule extends RestApiModule {
+  @Override
+  protected void configure() {
+    bind(FetchAction.class).in(Scopes.SINGLETON);
+    post(PROJECT_KIND, "fetch").to(FetchAction.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RemoteConfigurationMissingException.java b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RemoteConfigurationMissingException.java
new file mode 100644
index 0000000..6ed0ef0
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/pull/api/exception/RemoteConfigurationMissingException.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api.exception;
+
+public class RemoteConfigurationMissingException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  public RemoteConfigurationMissingException(String msg) {
+    super(msg);
+  }
+}
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index aa59a2d..365d4f6 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -109,17 +109,15 @@
 
 	Can be overridden at remote-level by setting replicationMaxRetries.
 
-	By default, fetchs are retried indefinitely.
+	By default, fetches are retried indefinitely.
 
 remote.NAME.url
-:	Address of the remote server to fetch from.  Multiple URLs may be
-	specified within a single remote block, listing different
-	destinations which share the same settings.  Assuming
-	sufficient threads in the thread pool, Gerrit fetchs from all
-	URLs in parallel, using one thread per URL.
-
-    TODO: This doesn't apply here: you can only replicate from one url
-
+:	Address of the remote server to fetch from. Single URL can be
+	specified within a single remote block. A remote node can request
+	a fetch from a source. However, it will need to be sure that the
+	source corresponds to only *him* configured as remote. Fetching from
+	multiple sources at the same time, whilst it was useful in a push scenario,
+	doesn't make so much sense in a pull-replication flow.
 	Within each URL value the magic placeholder `${name}` is
 	replaced with the Gerrit project name.  This is a Gerrit
 	specific extension to the otherwise standard Git URL syntax
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
new file mode 100644
index 0000000..045b882
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchActionTest.java
@@ -0,0 +1,231 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.http.HttpStatus.SC_ACCEPTED;
+import static org.apache.http.HttpStatus.SC_CREATED;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.restapi.BadRequestException;
+import com.google.gerrit.extensions.restapi.Response;
+import com.google.gerrit.extensions.restapi.RestApiException;
+import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
+import com.google.gerrit.server.config.UrlFormatter;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.git.WorkQueue.Task;
+import com.google.gerrit.server.project.ProjectResource;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class FetchActionTest {
+  FetchAction fetchAction;
+  String label = "instance-2-label";
+  String url = "file:///gerrit-host/instance-1/git/${name}.git";
+  String objectId = "c90989ed7a8ab01f1bdd022872428f020b866358";
+  String location = "http://gerrit-host/a/config/server/tasks/08d173e9";
+  int taskId = 1234;
+
+  @Mock FetchCommand fetchCommand;
+  @Mock ProjectResource projectResource;
+  @Mock WorkQueue workQueue;
+  @Mock ScheduledExecutorService exceutorService;
+  @Mock DynamicItem<UrlFormatter> urlFormatterDynamicItem;
+  @Mock UrlFormatter urlFormatter;
+  @Mock WorkQueue.Task<Void> task;
+
+  @Before
+  public void setup() {
+    when(workQueue.getDefaultQueue()).thenReturn(exceutorService);
+    when(urlFormatter.getRestUrl(anyString())).thenReturn(Optional.of(location));
+    when(exceutorService.submit(any(Runnable.class)))
+        .thenAnswer(
+            new Answer<WorkQueue.Task<Void>>() {
+              @Override
+              public Task<Void> answer(InvocationOnMock invocation) throws Throwable {
+                return task;
+              }
+            });
+    when(urlFormatterDynamicItem.get()).thenReturn(urlFormatter);
+    when(task.getTaskId()).thenReturn(taskId);
+
+    fetchAction = new FetchAction(fetchCommand, workQueue, urlFormatterDynamicItem);
+  }
+
+  @Test
+  public void shouldReturnCreatedResponseCode() throws RestApiException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    Response<?> response = fetchAction.apply(projectResource, inputParams);
+
+    assertThat(response.statusCode()).isEqualTo(SC_CREATED);
+  }
+
+  @SuppressWarnings("cast")
+  @Test
+  public void shouldReturnSourceUrlAndObjectIdAsAResponseBody() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    Response<?> response = fetchAction.apply(projectResource, inputParams);
+
+    assertThat((FetchAction.Input) response.value()).isEqualTo(inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingLabel() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.objectId = objectId;
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenEmptyLabel() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = "";
+    inputParams.objectId = objectId;
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenMissingObjectId() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = BadRequestException.class)
+  public void shouldThrowBadRequestExceptionWhenEmptyObjectId() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = "";
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenPocessingInterrupted()
+      throws RestApiException, InterruptedException, ExecutionException,
+          RemoteConfigurationMissingException, TimeoutException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    doThrow(new InterruptedException()).when(fetchCommand).fetch(any(), any(), any());
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = UnprocessableEntityException.class)
+  public void shouldThrowRestApiExceptionWhenNoSurceForGivenLabel()
+      throws RestApiException, InterruptedException, ExecutionException,
+          RemoteConfigurationMissingException, TimeoutException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = "non-existing-label";
+    inputParams.objectId = objectId;
+
+    doThrow(new RemoteConfigurationMissingException(""))
+        .when(fetchCommand)
+        .fetch(any(), any(), any());
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenIssueDuringPocessing()
+      throws RestApiException, InterruptedException, ExecutionException,
+          RemoteConfigurationMissingException, TimeoutException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    doThrow(new ExecutionException(new RuntimeException()))
+        .when(fetchCommand)
+        .fetch(any(), any(), any());
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenIssueWithUrlParam()
+      throws RestApiException, InterruptedException, ExecutionException,
+          RemoteConfigurationMissingException, TimeoutException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    doThrow(new IllegalStateException()).when(fetchCommand).fetch(any(), any(), any());
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test(expected = RestApiException.class)
+  public void shouldThrowRestApiExceptionWhenTimeout()
+      throws RestApiException, InterruptedException, ExecutionException,
+          RemoteConfigurationMissingException, TimeoutException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+
+    doThrow(new TimeoutException()).when(fetchCommand).fetch(any(), any(), any());
+
+    fetchAction.apply(projectResource, inputParams);
+  }
+
+  @Test
+  public void shouldReturnScheduledTaskForAsyncCall() throws RestApiException {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+    inputParams.async = true;
+
+    Response<?> response = fetchAction.apply(projectResource, inputParams);
+    assertThat(response.statusCode()).isEqualTo(SC_ACCEPTED);
+  }
+
+  @Test
+  public void shouldLocationHeaderForAsyncCall() throws Exception {
+    FetchAction.Input inputParams = new FetchAction.Input();
+    inputParams.label = label;
+    inputParams.objectId = objectId;
+    inputParams.async = true;
+
+    Response<?> response = fetchAction.apply(projectResource, inputParams);
+    assertThat(response).isInstanceOf(Response.Accepted.class);
+    Response.Accepted acceptResponse = (Response.Accepted) response;
+    assertThat(acceptResponse.location()).isEqualTo(location);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
new file mode 100644
index 0000000..007bfdb
--- /dev/null
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/pull/api/FetchCommandTest.java
@@ -0,0 +1,146 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.replication.pull.api;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static com.google.gerrit.testing.GerritJUnit.assertThrows;
+
+import com.google.common.collect.Lists;
+import com.google.gerrit.entities.Project;
+import com.googlesource.gerrit.plugins.replication.pull.PullReplicationStateLogger;
+import com.googlesource.gerrit.plugins.replication.pull.ReplicationState;
+import com.googlesource.gerrit.plugins.replication.pull.Source;
+import com.googlesource.gerrit.plugins.replication.pull.SourcesCollection;
+import com.googlesource.gerrit.plugins.replication.pull.api.exception.RemoteConfigurationMissingException;
+import java.net.URISyntaxException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jgit.transport.URIish;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class FetchCommandTest {
+  private static final String OBJECT_ID_TO_FETCH = "8f22c45da5ce298291b7329552568aae1bb62c10";
+  @Mock ReplicationState state;
+  @Mock ReplicationState.Factory fetchReplicationStateFactory;
+  @Mock PullReplicationStateLogger fetchStateLog;
+  @Mock Source source;
+  @Mock SourcesCollection sources;
+
+  @SuppressWarnings("rawtypes")
+  @Mock
+  Future future;
+
+  Project.NameKey projectName;
+  URIish uri;
+  String label;
+
+  FetchCommand objectUnderTest;
+
+  @Before
+  public void setup() throws URISyntaxException {
+    projectName = Project.nameKey("sample_project");
+    uri = new URIish("file://sample_host/repository_path/repo.git");
+    label = "instance-1-label";
+
+    when(fetchReplicationStateFactory.create(any())).thenReturn(state);
+    when(source.getRemoteConfigName()).thenReturn(label);
+    when(sources.getAll()).thenReturn(Lists.newArrayList(source));
+    when(source.schedule(projectName, OBJECT_ID_TO_FETCH, state, true))
+        .thenReturn(CompletableFuture.completedFuture(null));
+    objectUnderTest = new FetchCommand(fetchReplicationStateFactory, fetchStateLog, sources);
+  }
+
+  @Test
+  public void shouldFetchRefUpdate()
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    objectUnderTest.fetch(projectName, label, OBJECT_ID_TO_FETCH);
+
+    verify(source, times(1)).schedule(projectName, OBJECT_ID_TO_FETCH, state, true);
+  }
+
+  @Test
+  public void shouldMarkAllFetchTasksScheduled()
+      throws InterruptedException, ExecutionException, RemoteConfigurationMissingException,
+          TimeoutException {
+    objectUnderTest.fetch(projectName, label, OBJECT_ID_TO_FETCH);
+
+    verify(source, times(1)).schedule(projectName, OBJECT_ID_TO_FETCH, state, true);
+    verify(state, times(1)).markAllFetchTasksScheduled();
+  }
+
+  @Test
+  public void shouldUpdateStateWhenRemoteConfigNameIsMissing() {
+      assertThrows(RemoteConfigurationMissingException.class, () -> objectUnderTest.fetch(projectName, "unknownLabel", OBJECT_ID_TO_FETCH));
+      verify(fetchStateLog, times(1)).error(anyString(), eq(state));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldUpdateStateWhenInterruptedException()
+      throws InterruptedException, ExecutionException,
+          TimeoutException {
+    when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new InterruptedException());
+    when(source.schedule(projectName, OBJECT_ID_TO_FETCH, state, true)).thenReturn(future);
+
+    InterruptedException e = assertThrows(InterruptedException.class,() -> objectUnderTest.fetch(projectName, label, OBJECT_ID_TO_FETCH));
+    verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldUpdateStateWhenExecutionException()
+      throws InterruptedException, ExecutionException,
+          TimeoutException {
+    when(future.get(anyLong(), eq(TimeUnit.SECONDS)))
+        .thenThrow(new ExecutionException(new Exception()));
+    when(source.schedule(projectName, OBJECT_ID_TO_FETCH, state, true)).thenReturn(future);
+
+    ExecutionException e =
+        assertThrows(
+            ExecutionException.class,
+            () -> objectUnderTest.fetch(projectName, label, OBJECT_ID_TO_FETCH));
+    verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void shouldUpdateStateWhenTimeoutException()
+      throws InterruptedException, ExecutionException,
+          TimeoutException {
+    when(future.get(anyLong(), eq(TimeUnit.SECONDS))).thenThrow(new TimeoutException());
+    when(source.schedule(projectName, OBJECT_ID_TO_FETCH, state, true)).thenReturn(future);
+
+    TimeoutException e =
+        assertThrows(
+            TimeoutException.class,
+            () -> objectUnderTest.fetch(projectName, label, OBJECT_ID_TO_FETCH));
+    verify(fetchStateLog, times(1)).error(anyString(), eq(e), eq(state));
+  }
+}