Merge "Avoid starting tasks which are not present under waiting storage area"
diff --git a/BUILD b/BUILD
index 1ed5cf3..2a6dfd5 100644
--- a/BUILD
+++ b/BUILD
@@ -16,7 +16,6 @@
     resources = glob(["src/main/resources/**/*"]),
     deps = [
         "//lib/auto:auto-value-gson",
-        "//lib/commons:io",
     ],
 )
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
index ff3fdf3..89b97e9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ChainedScheduler.java
@@ -40,7 +40,7 @@
    */
   public interface Runner<T> {
     /** Will get executed in the thread pool task for each item */
-    default void run(T item) {}
+    void run(T item);
 
     /** Will get called after the last item completes */
     default void onDone() {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
index ab8c476..12f0273 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/Destination.java
@@ -26,6 +26,7 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet.Builder;
 import com.google.common.collect.Lists;
+import com.google.common.io.Files;
 import com.google.gerrit.entities.AccountGroup;
 import com.google.gerrit.entities.BranchNameKey;
 import com.google.gerrit.entities.GroupReference;
@@ -78,7 +79,6 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import org.apache.commons.io.FilenameUtils;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.eclipse.jgit.lib.Constants;
 import org.eclipse.jgit.lib.Ref;
@@ -698,7 +698,7 @@
     } else if (remoteNameStyle.equals("underscore")) {
       name = name.replace("/", "_");
     } else if (remoteNameStyle.equals("basenameOnly")) {
-      name = FilenameUtils.getBaseName(name);
+      name = Files.getNameWithoutExtension(name);
     } else if (!remoteNameStyle.equals("slash")) {
       repLog.atFine().log("Unknown remoteNameStyle: %s, falling back to slash", remoteNameStyle);
     }
@@ -741,8 +741,8 @@
     return config.getProjects();
   }
 
-  int getLockErrorMaxRetries() {
-    return config.getLockErrorMaxRetries();
+  int getUpdateRefErrorMaxRetries() {
+    return config.getUpdateRefErrorMaxRetries();
   }
 
   String getRemoteConfigName() {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
index 4b757ea..1b39374 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/DestinationConfiguration.java
@@ -31,7 +31,7 @@
   private final int rescheduleDelay;
   private final int retryDelay;
   private final int drainQueueAttempts;
-  private final int lockErrorMaxRetries;
+  private final int updateRefErrorMaxRetries;
   private final ImmutableList<String> adminUrls;
   private final int poolThreads;
   private final boolean createMissingRepos;
@@ -60,8 +60,11 @@
         Math.max(0, getInt(remoteConfig, cfg, "drainQueueAttempts", DEFAULT_DRAIN_QUEUE_ATTEMPTS));
     poolThreads = Math.max(0, getInt(remoteConfig, cfg, "threads", 1));
     authGroupNames = ImmutableList.copyOf(cfg.getStringList("remote", name, "authGroup"));
-    lockErrorMaxRetries = cfg.getInt("replication", "lockErrorMaxRetries", 0);
-
+    updateRefErrorMaxRetries =
+        cfg.getInt(
+            "replication",
+            "updateRefErrorMaxRetries",
+            cfg.getInt("replication", "lockErrorMaxRetries", 0));
     createMissingRepos = cfg.getBoolean("remote", name, "createMissingRepositories", true);
     replicatePermissions = cfg.getBoolean("remote", name, "replicatePermissions", true);
     replicateProjectDeletions = cfg.getBoolean("remote", name, "replicateProjectDeletions", false);
@@ -106,8 +109,8 @@
     return poolThreads;
   }
 
-  public int getLockErrorMaxRetries() {
-    return lockErrorMaxRetries;
+  public int getUpdateRefErrorMaxRetries() {
+    return updateRefErrorMaxRetries;
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
index 3d06d47..565790c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/PushOne.java
@@ -91,6 +91,17 @@
   static final String ALL_REFS = "..all..";
   static final String ID_KEY = "pushOneId";
 
+  // The string here needs to match the one returned by Git(versions prior to 2014) server.
+  // See:
+  // https://github.com/git/git/blob/b4d75ac1d152bbab44b0777a4cc0c48db75f6024/builtin/receive-pack.c#L587
+  // https://github.com/eclipse/jgit/blob/8774f541904ca9afba1786b4da14c1aedf4dda78/org.eclipse.jgit/src/org/eclipse/jgit/transport/ReceivePack.java#L1859
+  static final String LOCK_FAILURE = "failed to lock";
+
+  // The string here needs to match the one returned by Git server.
+  // See:
+  // https://github.com/git/git/blob/e67fbf927dfdf13d0b21dc6ea15dc3c7ef448ea0/builtin/receive-pack.c#L1611
+  static final String UPDATE_REF_FAILURE = "failed to update ref";
+
   interface Factory {
     PushOne create(Project.NameKey d, URIish u);
   }
@@ -114,8 +125,8 @@
   private final int maxRetries;
   private boolean canceled;
   private final ListMultimap<String, ReplicationState> stateMap = LinkedListMultimap.create();
-  private final int maxLockRetries;
-  private int lockRetryCount;
+  private final int maxUpdateRefRetries;
+  private int updateRefRetryCount;
   private final int id;
   private final long createdAt;
   private final ReplicationMetrics metrics;
@@ -151,8 +162,8 @@
     threadScoper = ts;
     projectName = d;
     uri = u;
-    lockRetryCount = 0;
-    maxLockRetries = pool.getLockErrorMaxRetries();
+    updateRefRetryCount = 0;
+    maxUpdateRefRetries = pool.getUpdateRefErrorMaxRetries();
     id = ig.next();
     stateLog = sl;
     createdAt = System.nanoTime();
@@ -397,12 +408,12 @@
       Throwable cause = e.getCause();
       if (cause instanceof JSchException && cause.getMessage().startsWith("UnknownHostKey:")) {
         repLog.atSevere().log("Cannot replicate to %s: %s", uri, cause.getMessage());
-      } else if (e instanceof LockFailureException) {
-        lockRetryCount++;
-        repLog.atSevere().log("Cannot replicate to %s due to lock failure", uri);
+      } else if (e instanceof UpdateRefFailureException) {
+        updateRefRetryCount++;
+        repLog.atSevere().log("Cannot replicate to %s due to a lock or write ref failure", uri);
 
         // The remote push operation should be retried.
-        if (lockRetryCount <= maxLockRetries) {
+        if (updateRefRetryCount <= maxUpdateRefRetries) {
           if (canceledWhileRunning.get()) {
             logCanceledWhileRunningException(e);
           } else {
@@ -410,7 +421,8 @@
           }
         } else {
           repLog.atSevere().log(
-              "Giving up after %d lock failures during replication to %s", lockRetryCount, uri);
+              "Giving up after %d '%s' failures during replication to %s",
+              updateRefRetryCount, e.getMessage(), uri);
         }
       } else {
         if (canceledWhileRunning.get()) {
@@ -682,7 +694,8 @@
     cmds.add(new RemoteRefUpdate(git, (Ref) null, dst, force, null, null));
   }
 
-  private void updateStates(Collection<RemoteRefUpdate> refUpdates) throws LockFailureException {
+  private void updateStates(Collection<RemoteRefUpdate> refUpdates)
+      throws UpdateRefFailureException {
     Set<String> doneRefs = new HashSet<>();
     boolean anyRefFailed = false;
     RemoteRefUpdate.Status lastRefStatusError = RemoteRefUpdate.Status.OK;
@@ -726,8 +739,9 @@
                         + " of destination repository.",
                     u.getRemoteName(), uri),
                 logStatesArray);
-          } else if ("failed to lock".equals(u.getMessage())) {
-            throw new LockFailureException(uri, u.getMessage());
+          } else if (LOCK_FAILURE.equals(u.getMessage())
+              || UPDATE_REF_FAILURE.equals(u.getMessage())) {
+            throw new UpdateRefFailureException(uri, u.getMessage());
           } else {
             stateLog.error(
                 String.format(
@@ -766,10 +780,10 @@
     stateMap.clear();
   }
 
-  public static class LockFailureException extends TransportException {
+  public static class UpdateRefFailureException extends TransportException {
     private static final long serialVersionUID = 1L;
 
-    LockFailureException(URIish uri, String message) {
+    UpdateRefFailureException(URIish uri, String message) {
       super(uri, message);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
index 641ac7d..66be6dd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationQueue.java
@@ -188,7 +188,7 @@
 
   private void firePendingEvents() {
     replaying = true;
-    new ChainedScheduler.StreamScheduler<ReplicationTasksStorage.ReplicateRefUpdate>(
+    new ChainedScheduler.StreamScheduler<>(
         workQueue.getDefaultQueue(),
         replicationTasksStorage.streamWaiting(),
         new ChainedScheduler.Runner<ReplicationTasksStorage.ReplicateRefUpdate>() {
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index fc3352d..6a2cf26 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -108,19 +108,23 @@
 	value for this is approximately the smallest remote.NAME.replicationDelay
 	divided by 5.
 
-replication.lockErrorMaxRetries
-:	Number of times to retry a replication operation if a lock
-	error is detected.
+<a name="replication.updateRefErrorMaxRetries">replication.updateRefErrorMaxRetries</a>
+:	Number of times to retry a replication operation if an update
+	ref error is detected.
 
 	If two or more replication operations (to the same GIT and Ref)
 	are scheduled at approximately the same time (and end up on different
 	replication threads), there is a large probability that the last
-	push to complete will fail with a remote "failure to lock" error.
+	push to complete will fail with a remote "failed to update ref" error.
+	This error may also occur due to a transient issue like file system
+	being full which was previously returned as "failed to write" by git.
+
 	This option allows Gerrit to retry the replication push when the
-	"failure to lock" error is detected.
+	"failed to update ref" error is detected. Also retry when the error
+	"failed to lock" is detected as that is the legacy string used by git.
 
 	A good value would be 3 retries or less, depending on how often
-	you see lockError collisions in your server logs. A too highly set
+	you see updateRefError collisions in your server logs. A too highly set
 	value risks keeping around the replication operations in the queue
 	for a long time, and the number of items in the queue will increase
 	with time.
@@ -135,6 +139,16 @@
 
 	Default: 0 (disabled, i.e. never retry)
 
+replication.lockErrorMaxRetries
+:	Refer to the [replication.updateRefErrorMaxRetries][4] section.
+
+	If both `lockErrorMaxRetries` and `updateRefErrorMaxRetries` are
+	configured, then `updateRefErrorMaxRetries` takes precedence.
+
+	Default: 0 (disabled, i.e. never retry)
+
+[4]: #replication.updateRefErrorMaxRetries
+
 replication.maxRetries
 :	Maximum number of times to retry a push operation that previously
 	failed.
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
index 1ad8c4d..90191f2 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ChainedSchedulerTest.java
@@ -238,7 +238,7 @@
     TestRunner runner = new TestRunner();
     List<String> items = new ArrayList<>();
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
     assertThat(runner.awaitDone(1)).isEqualTo(true);
     assertThat(runner.runCount()).isEqualTo(0);
   }
@@ -250,7 +250,7 @@
     List<String> items = new ArrayList<>();
     items.add(FIRST);
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
     assertThat(runner.awaitDone(1)).isEqualTo(true);
     assertThat(runner.runCount()).isEqualTo(1);
   }
@@ -261,7 +261,7 @@
     TestRunner runner = new TestRunner();
     List<String> items = createManyItems();
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
     assertThat(runner.awaitDone(items.size())).isEqualTo(true);
     assertThat(runner.runCount()).isEqualTo(items.size());
   }
@@ -282,7 +282,7 @@
     items.add(SECOND);
     items.add(THIRD);
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
     assertThat(runner.awaitDone(items.size())).isEqualTo(true);
     assertThat(runner.runCount()).isEqualTo(items.size());
   }
@@ -293,7 +293,7 @@
     WaitingRunner runner = new WaitingRunner();
     List<String> items = createManyItems();
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
     for (int i = 1; i <= items.size(); i++) {
       assertThat(runner.isDone()).isEqualTo(false);
       runner.runOneRandomStarted();
@@ -312,7 +312,7 @@
       List<String> items = createItems(threads + 1 /* Running */ + 1 /* Waiting */);
       CountingIterator it = new CountingIterator(items.iterator());
 
-      new ChainedScheduler(executor, it, runner);
+      new ChainedScheduler<>(executor, it, runner);
       assertThat(runner.awaitStart(FIRST, 1)).isEqualTo(true); // Confirms at least one Running
       assertThat(it.count).isGreaterThan(1); // Confirms at least one extra Waiting or Running
       assertThat(it.count).isLessThan(items.size()); // Confirms at least one still not queued
@@ -341,7 +341,7 @@
     WaitingRunner runner = new WaitingRunner();
     List<String> items = createManyItems();
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
 
     for (int j = 1; j <= MANY_ITEMS_SIZE; j += threads) {
       // If #threads items can start before any complete, it proves #threads are
@@ -372,7 +372,7 @@
       batches.add(executeWaitingRunnableBatch(batchSize, executor));
     }
 
-    new ChainedScheduler(executor, items.iterator(), runner);
+    new ChainedScheduler<>(executor, items.iterator(), runner);
 
     for (int i = 1; i <= items.size(); i++) {
       for (int b = 0; b < blockSize; b++) {
@@ -399,7 +399,8 @@
     TestRunner runner = new TestRunner();
     List<String> items = createManyItems();
 
-    new ChainedScheduler(executor, items.iterator(), new ChainedScheduler.ForwardingRunner(runner));
+    new ChainedScheduler<>(
+        executor, items.iterator(), new ChainedScheduler.ForwardingRunner<>(runner));
     assertThat(runner.awaitDone(items.size())).isEqualTo(true);
     assertThat(runner.runCount()).isEqualTo(items.size());
   }
@@ -415,21 +416,15 @@
     final AtomicBoolean closed = new AtomicBoolean(false);
     Object closeRecorder =
         new Object() {
+          @SuppressWarnings("unused") // Called via reflection
           public void close() {
             closed.set(true);
           }
         };
-    Stream<String> stream =
-        ForwardingProxy.create(
-            Stream.class,
-            items.stream(),
-            new Object() {
-              public void close() {
-                closed.set(true);
-              }
-            });
+    @SuppressWarnings("unchecked") // Stream.class is converted to Stream<String>.class
+    Stream<String> stream = ForwardingProxy.create(Stream.class, items.stream(), closeRecorder);
 
-    new ChainedScheduler.StreamScheduler(executor, stream, runner);
+    new ChainedScheduler.StreamScheduler<>(executor, stream, runner);
     assertThat(closed.get()).isEqualTo(false);
 
     // Since there is only a single thread, the Stream cannot get closed before this runs
diff --git a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
index dbd538e..a1f61fe 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/replication/ForwardingProxy.java
@@ -70,11 +70,12 @@
     }
   }
 
+  @SuppressWarnings("unchecked") // newProxyInstance returns Object
   public static <T> T create(Class<T> toProxy, T delegate, Object overrider) {
     return (T)
         Proxy.newProxyInstance(
             delegate.getClass().getClassLoader(),
             new Class[] {toProxy},
-            new Handler(delegate, overrider));
+            new Handler<>(delegate, overrider));
   }
 }