Merge branch 'stable-2.16' into stable-3.0

* stable-2.16:
  Revert "Do not reload config when queue is not ready"
  Do not reload config when queue is not ready
  Revert "Remove replication event from pending when runway is allowed"
  ReplicationFileBasedConfig: Fix setting default sshConnectionTimeout
  SshHelper: Add class javadoc
  Make more classes and fields public to ease extensibility
  Allow to configure timeout for SSH connection and commands
  StartCommand:  Fix synchronization on non-final field
  Destination: Suppress FutureReturnValueIgnored warning

Change-Id: I101dbbcfc3f049ebf275f4bff6c4a99e53771416
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
index 6b42464..bec2019 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/AutoReloadConfigDecorator.java
@@ -160,4 +160,14 @@
         autoReloadExecutor.scheduleAtFixedRate(
             this::reloadIfNeeded, RELOAD_DELAY, RELOAD_INTERVAL, TimeUnit.SECONDS);
   }
+
+  @Override
+  public synchronized int getSshConnectionTimeout() {
+    return currentConfig.getSshConnectionTimeout();
+  }
+
+  @Override
+  public synchronized int getSshCommandTimeout() {
+    return currentConfig.getSshCommandTimeout();
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index f2a1902..2677152 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -69,6 +69,7 @@
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import org.apache.commons.io.FilenameUtils;
@@ -398,7 +399,9 @@
         e = opFactory.create(project, uri);
         addRef(e, ref);
         e.addState(ref, state);
-        pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
+        @SuppressWarnings("unused")
+        ScheduledFuture<?> ignored =
+            pool.schedule(e, now ? 0 : config.getDelay(), TimeUnit.SECONDS);
         pending.put(uri, e);
       } else if (!e.getRefs().contains(ref)) {
         addRef(e, ref);
@@ -425,11 +428,15 @@
   }
 
   void scheduleDeleteProject(URIish uri, Project.NameKey project) {
-    pool.schedule(deleteProjectFactory.create(uri, project), 0, TimeUnit.SECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> ignored =
+        pool.schedule(deleteProjectFactory.create(uri, project), 0, TimeUnit.SECONDS);
   }
 
   void scheduleUpdateHead(URIish uri, Project.NameKey project, String newHead) {
-    pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
+    @SuppressWarnings("unused")
+    ScheduledFuture<?> ignored =
+        pool.schedule(updateHeadFactory.create(uri, project, newHead), 0, TimeUnit.SECONDS);
   }
 
   private void addRef(PushOne e, String ref) {
@@ -507,7 +514,9 @@
         pending.put(uri, pushOp);
         switch (reason) {
           case COLLISION:
-            pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
+            @SuppressWarnings("unused")
+            ScheduledFuture<?> ignored =
+                pool.schedule(pushOp, config.getRescheduleDelay(), TimeUnit.SECONDS);
             break;
           case TRANSPORT_ERROR:
           case REPOSITORY_MISSING:
@@ -519,7 +528,9 @@
             postReplicationFailedEvent(pushOp, status);
             if (pushOp.setToRetry()) {
               postReplicationScheduledEvent(pushOp);
-              pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
+              @SuppressWarnings("unused")
+              ScheduledFuture<?> ignored2 =
+                  pool.schedule(pushOp, config.getRetryDelay(), TimeUnit.MINUTES);
             } else {
               pushOp.canceledByReplication();
               pending.remove(uri);
@@ -538,11 +549,11 @@
       if (op.wasCanceled()) {
         return RunwayStatus.canceled();
       }
+      pending.remove(op.getURI());
       PushOne inFlightOp = inFlight.get(op.getURI());
       if (inFlightOp != null) {
         return RunwayStatus.denied(inFlightOp.getId());
       }
-      pending.remove(op.getURI());
       inFlight.put(op.getURI(), op);
     }
     return RunwayStatus.allowed();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 856ffb1..b2d0de2 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -19,7 +19,7 @@
 import org.eclipse.jgit.lib.Config;
 import org.eclipse.jgit.transport.RemoteConfig;
 
-class DestinationConfiguration {
+public class DestinationConfiguration {
   static final int DEFAULT_REPLICATION_DELAY = 15;
   static final int DEFAULT_RESCHEDULE_DELAY = 3;
 
@@ -40,7 +40,7 @@
   private final RemoteConfig remoteConfig;
   private final int maxRetries;
 
-  DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
+  protected DestinationConfiguration(RemoteConfig remoteConfig, Config cfg) {
     this.remoteConfig = remoteConfig;
     String name = remoteConfig.getName();
     urls = ImmutableList.copyOf(cfg.getStringList("remote", name, "url"));
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
index 364f1b4..fccdb7b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicatedEvent.java
@@ -21,13 +21,13 @@
 import org.eclipse.jgit.transport.RemoteRefUpdate.Status;
 
 public class RefReplicatedEvent extends RefEvent {
-  static final String TYPE = "ref-replicated";
+  public static final String TYPE = "ref-replicated";
 
-  final String project;
-  final String ref;
-  final String targetNode;
-  final String status;
-  final Status refStatus;
+  public final String project;
+  public final String ref;
+  public final String targetNode;
+  public final String status;
+  public final Status refStatus;
 
   public RefReplicatedEvent(
       String project,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
index 90595b3..4789a96 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/RefReplicationDoneEvent.java
@@ -18,7 +18,7 @@
 import com.google.gerrit.server.events.RefEvent;
 
 public class RefReplicationDoneEvent extends RefEvent {
-  static final String TYPE = "ref-replication-done";
+  public static final String TYPE = "ref-replication-done";
 
   final String project;
   final String ref;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
index 1c62ab7..929c538 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationConfig.java
@@ -47,4 +47,8 @@
   int shutdown();
 
   void startup(WorkQueue workQueue);
+
+  int getSshConnectionTimeout();
+
+  int getSshCommandTimeout();
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
index 64eacf9..a075f12 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationFileBasedConfig.java
@@ -17,6 +17,8 @@
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isGerritHttp;
 import static com.googlesource.gerrit.plugins.replication.AdminApiFactory.isSSH;
 import static com.googlesource.gerrit.plugins.replication.ReplicationQueue.repLog;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 
 import com.google.common.base.Strings;
@@ -29,6 +31,7 @@
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.annotations.PluginData;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.config.ConfigUtil;
 import com.google.gerrit.server.config.SitePaths;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Inject;
@@ -52,6 +55,7 @@
 @Singleton
 public class ReplicationFileBasedConfig implements ReplicationConfig {
   private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+  private static final int DEFAULT_SSH_CONNECTION_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes
 
   private List<Destination> destinations;
   private final SitePaths site;
@@ -59,6 +63,8 @@
   private boolean replicateAllOnPluginStart;
   private boolean defaultForceUpdate;
   private int maxRefsToLog;
+  private int sshCommandTimeout;
+  private int sshConnectionTimeout = DEFAULT_SSH_CONNECTION_TIMEOUT_MS;
   private final FileBasedConfig config;
   private final Path pluginDataDir;
 
@@ -124,6 +130,18 @@
 
     maxRefsToLog = config.getInt("gerrit", "maxRefsToLog", 0);
 
+    sshCommandTimeout =
+        (int) ConfigUtil.getTimeUnit(config, "gerrit", null, "sshCommandTimeout", 0, SECONDS);
+    sshConnectionTimeout =
+        (int)
+            ConfigUtil.getTimeUnit(
+                config,
+                "gerrit",
+                null,
+                "sshConnectionTimeout",
+                DEFAULT_SSH_CONNECTION_TIMEOUT_MS,
+                MILLISECONDS);
+
     ImmutableList.Builder<Destination> dest = ImmutableList.builder();
     for (RemoteConfig c : allRemotes(config)) {
       if (c.getURIs().isEmpty()) {
@@ -308,4 +326,14 @@
       cfg.start(workQueue);
     }
   }
+
+  @Override
+  public int getSshConnectionTimeout() {
+    return sshConnectionTimeout;
+  }
+
+  @Override
+  public int getSshCommandTimeout() {
+    return sshCommandTimeout;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
index 301219f..aa965fe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationScheduledEvent.java
@@ -18,11 +18,11 @@
 import com.google.gerrit.server.events.RefEvent;
 
 public class ReplicationScheduledEvent extends RefEvent {
-  static final String TYPE = "ref-replication-scheduled";
+  public static final String TYPE = "ref-replication-scheduled";
 
-  final String project;
-  final String ref;
-  final String targetNode;
+  public final String project;
+  public final String ref;
+  public final String targetNode;
 
   public ReplicationScheduledEvent(String project, String ref, String targetNode) {
     super(TYPE);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
index cfa95dd..f2d55de 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationStateLogger.java
@@ -26,7 +26,7 @@
  * state to the stderr console.
  */
 @Singleton
-class ReplicationStateLogger implements ReplicationStateListener {
+public class ReplicationStateLogger implements ReplicationStateListener {
 
   @Override
   public void warn(String msg, ReplicationState... states) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java b/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
index 68e9652..d73c101 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/SshHelper.java
@@ -25,19 +25,23 @@
 import org.eclipse.jgit.util.FS;
 import org.eclipse.jgit.util.io.StreamCopyThread;
 
-class SshHelper {
-  private static final int SSH_REMOTE_TIMEOUT = 120 * 1000; // 2 minutes = 120 * 1000ms
-
+/** Utility class that provides SSH access to remote URIs. */
+public class SshHelper {
   private final Provider<SshSessionFactory> sshSessionFactoryProvider;
+  private final int commandTimeout;
+  private final int connectionTimeout;
 
   @Inject
-  SshHelper(Provider<SshSessionFactory> sshSessionFactoryProvider) {
+  protected SshHelper(
+      ReplicationConfig replicationConfig, Provider<SshSessionFactory> sshSessionFactoryProvider) {
     this.sshSessionFactoryProvider = sshSessionFactoryProvider;
+    this.commandTimeout = replicationConfig.getSshCommandTimeout();
+    this.connectionTimeout = replicationConfig.getSshConnectionTimeout();
   }
 
-  int executeRemoteSsh(URIish uri, String cmd, OutputStream errStream) throws IOException {
+  public int executeRemoteSsh(URIish uri, String cmd, OutputStream errStream) throws IOException {
     RemoteSession ssh = connect(uri);
-    Process proc = ssh.exec(cmd, 0);
+    Process proc = ssh.exec(cmd, commandTimeout);
     proc.getOutputStream().close();
     StreamCopyThread out = new StreamCopyThread(proc.getInputStream(), errStream);
     StreamCopyThread err = new StreamCopyThread(proc.getErrorStream(), errStream);
@@ -54,7 +58,7 @@
     return proc.exitValue();
   }
 
-  OutputStream newErrorBufferStream() {
+  public OutputStream newErrorBufferStream() {
     return new OutputStream() {
       private final StringBuilder out = new StringBuilder();
       private final StringBuilder line = new StringBuilder();
@@ -83,7 +87,7 @@
     };
   }
 
-  RemoteSession connect(URIish uri) throws TransportException {
-    return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, SSH_REMOTE_TIMEOUT);
+  protected RemoteSession connect(URIish uri) throws TransportException {
+    return sshSessionFactoryProvider.get().getSession(uri, null, FS.DETECTED, connectionTimeout);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
index 77bc285..20bdf65 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/StartCommand.java
@@ -53,6 +53,8 @@
 
   @Inject private ReplicationState.Factory replicationStateFactory;
 
+  private final Object lock = new Object();
+
   @Override
   protected void run() throws Failure {
     if (all && projectPatterns.size() > 0) {
@@ -100,7 +102,7 @@
 
   public void writeStdOutSync(String message) {
     if (wait) {
-      synchronized (stdout) {
+      synchronized (lock) {
         stdout.println(message);
         stdout.flush();
       }
@@ -109,7 +111,7 @@
 
   public void writeStdErrSync(String message) {
     if (wait) {
-      synchronized (stderr) {
+      synchronized (lock) {
         stderr.println(message);
         stderr.flush();
       }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index 6493761..f58845d 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -86,6 +86,14 @@
 :	Number of refs, that are pushed during replication, to be logged.
 	For printing all refs to the logs, use a value of 0. By default, 0.
 
+gerrit.sshCommandTimeout
+:	Timeout for SSH command execution. If 0, there is no timeout and
+	the client waits indefinitely. By default, 0.
+
+gerrit.sshConnectionTimeout
+:	Timeout for SSH connections. If 0, there is no timeout and
+        the client waits indefinitely. By default, 2 minutes.
+
 replication.lockErrorMaxRetries
 :	Number of times to retry a replication operation if a lock
 	error is detected.