Extract the PushOp logic from PushQueue

There is no reason for this to be wedged inside of the PushQueue,
using static methods.  Its cleaner to make the PushOp actually
be the Runnable we inject into the WorkQueue, and let it manage
the push via its own instance members.  Consequently, it is now
a proper top level class.

Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/src/main/java/com/google/gerrit/git/PushOp.java b/src/main/java/com/google/gerrit/git/PushOp.java
new file mode 100644
index 0000000..ab9224f
--- /dev/null
+++ b/src/main/java/com/google/gerrit/git/PushOp.java
@@ -0,0 +1,187 @@
+// Copyright (C) 2009 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.git;
+
+import com.google.gerrit.server.GerritServer;
+import com.google.gwtjsonrpc.server.XsrfException;
+import com.google.gwtorm.client.OrmException;
+
+import com.jcraft.jsch.JSchException;
+
+import org.slf4j.Logger;
+import org.spearce.jgit.errors.NoRemoteRepositoryException;
+import org.spearce.jgit.errors.NotSupportedException;
+import org.spearce.jgit.errors.TransportException;
+import org.spearce.jgit.lib.NullProgressMonitor;
+import org.spearce.jgit.lib.Repository;
+import org.spearce.jgit.transport.PushResult;
+import org.spearce.jgit.transport.RefSpec;
+import org.spearce.jgit.transport.RemoteConfig;
+import org.spearce.jgit.transport.RemoteRefUpdate;
+import org.spearce.jgit.transport.Transport;
+import org.spearce.jgit.transport.URIish;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A push to remote operation started by {@link PushQueue}.
+ * <p>
+ * Instance members are protected by the lock within PushQueue. Callers must
+ * take that lock to ensure they are working with a current view of the object.
+ */
+class PushOp implements Runnable {
+  private static final Logger log = PushQueue.log;
+
+  private final Set<String> delta = new HashSet<String>();
+  private final String projectName;
+  private final RemoteConfig config;
+  private final URIish uri;
+
+  PushOp(final String d, final RemoteConfig c, final URIish u) {
+    projectName = d;
+    config = c;
+    uri = u;
+  }
+
+  URIish getURI() {
+    return uri;
+  }
+
+  void addRef(final String ref) {
+    delta.add(ref);
+  }
+
+  public void run() {
+    try {
+      // Lock the queue, and remove ourselves, so we can't be modified once
+      // we start replication (instead a new instance, with the same URI, is
+      // created and scheduled for a future point in time.)
+      //
+      PushQueue.notifyStarting(this);
+      pushImpl();
+    } catch (RuntimeException e) {
+      log.error("Unexpected error during replication", e);
+    } catch (Error e) {
+      log.error("Unexpected error during replication", e);
+    }
+  }
+
+  private void pushImpl() {
+    final Repository db;
+    try {
+      db = GerritServer.getInstance().getRepositoryCache().get(projectName);
+    } catch (OrmException e) {
+      log.error("Cannot open repository cache", e);
+      return;
+    } catch (XsrfException e) {
+      log.error("Cannot open repository cache", e);
+      return;
+    } catch (InvalidRepositoryException e) {
+      log.error("Cannot replicate " + projectName, e);
+      return;
+    }
+
+    final Transport tn;
+    try {
+      tn = Transport.open(db, uri);
+      tn.applyConfig(config);
+    } catch (NotSupportedException e) {
+      log.error("Cannot replicate to " + uri, e);
+      return;
+    }
+
+    final PushResult res;
+    try {
+      res = tn.push(NullProgressMonitor.INSTANCE, computeUpdates(db));
+    } catch (NoRemoteRepositoryException e) {
+      log.error("Cannot replicate to " + uri + "; repository not found");
+      return;
+    } catch (NotSupportedException e) {
+      log.error("Cannot replicate to " + uri, e);
+      return;
+    } catch (TransportException e) {
+      final Throwable cause = e.getCause();
+      if (cause instanceof JSchException
+          && cause.getMessage().startsWith("UnknownHostKey:")) {
+        log.error("Cannot replicate to " + uri + ": " + cause.getMessage());
+        return;
+      }
+      log.error("Cannot replicate to " + uri, e);
+      return;
+    } catch (IOException e) {
+      log.error("Cannot replicate to " + uri, e);
+      return;
+    } finally {
+      try {
+        tn.close();
+      } catch (Throwable e2) {
+        log.warn("Unexpected error while closing " + uri, e2);
+      }
+    }
+
+    for (final RemoteRefUpdate u : res.getRemoteUpdates()) {
+      switch (u.getStatus()) {
+        case OK:
+        case UP_TO_DATE:
+        case NON_EXISTING:
+          break;
+
+        case NOT_ATTEMPTED:
+        case AWAITING_REPORT:
+        case REJECTED_NODELETE:
+        case REJECTED_NONFASTFORWARD:
+        case REJECTED_REMOTE_CHANGED:
+          log.error("Failed replicate of " + u.getRemoteName() + " to " + uri
+              + ": status " + u.getStatus().name());
+          break;
+
+        case REJECTED_OTHER_REASON:
+          log.error("Failed replicate of " + u.getRemoteName() + " to " + uri
+              + ", reason: " + u.getMessage());
+          break;
+      }
+    }
+  }
+
+  private List<RemoteRefUpdate> computeUpdates(final Repository db)
+      throws IOException {
+    final ArrayList<RemoteRefUpdate> cmds = new ArrayList<RemoteRefUpdate>();
+    for (final String ref : delta) {
+      final String src = ref;
+      RefSpec spec = null;
+      for (final RefSpec s : config.getPushRefSpecs()) {
+        if (s.matchSource(src)) {
+          spec = s.expandFromSource(src);
+          break;
+        }
+      }
+      if (spec == null) {
+        continue;
+      }
+
+      // If the ref still exists locally, send it, else delete it.
+      //
+      final String srcexp = db.resolve(src) != null ? src : null;
+      final String dst = spec.getDestination();
+      final boolean force = spec.isForceUpdate();
+      cmds.add(new RemoteRefUpdate(db, srcexp, dst, force, null, null));
+    }
+    return cmds;
+  }
+}
diff --git a/src/main/java/com/google/gerrit/git/PushQueue.java b/src/main/java/com/google/gerrit/git/PushQueue.java
index 7c6c035..0e35219 100644
--- a/src/main/java/com/google/gerrit/git/PushQueue.java
+++ b/src/main/java/com/google/gerrit/git/PushQueue.java
@@ -19,25 +19,16 @@
 import com.google.gwtjsonrpc.server.XsrfException;
 import com.google.gwtorm.client.OrmException;
 
-import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spearce.jgit.errors.NoRemoteRepositoryException;
-import org.spearce.jgit.errors.NotSupportedException;
-import org.spearce.jgit.errors.TransportException;
-import org.spearce.jgit.lib.NullProgressMonitor;
-import org.spearce.jgit.lib.Repository;
 import org.spearce.jgit.lib.RepositoryConfig;
 import org.spearce.jgit.transport.OpenSshConfig;
-import org.spearce.jgit.transport.PushResult;
 import org.spearce.jgit.transport.RefSpec;
 import org.spearce.jgit.transport.RemoteConfig;
-import org.spearce.jgit.transport.RemoteRefUpdate;
 import org.spearce.jgit.transport.SshConfigSessionFactory;
 import org.spearce.jgit.transport.SshSessionFactory;
-import org.spearce.jgit.transport.Transport;
 import org.spearce.jgit.transport.URIish;
 
 import java.io.File;
@@ -47,17 +38,15 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 public class PushQueue {
-  private static final Logger log = LoggerFactory.getLogger(PushQueue.class);
+  static final Logger log = LoggerFactory.getLogger(PushQueue.class);
   private static final int startDelay = 15; // seconds
   private static List<RemoteConfig> configs;
-  private static final Map<URIish, PushOp> active =
+  private static final Map<URIish, PushOp> pending =
       new HashMap<URIish, PushOp>();
 
   static {
@@ -95,130 +84,17 @@
 
   private static synchronized void scheduleImp(final Project.NameKey project,
       final String ref, final RemoteConfig srcConf, final URIish uri) {
-    PushOp e = active.get(uri);
+    PushOp e = pending.get(uri);
     if (e == null) {
-      final PushOp newOp = new PushOp(project.get(), srcConf, uri);
-      WorkQueue.schedule(new Runnable() {
-        public void run() {
-          try {
-            pushImpl(newOp);
-          } catch (RuntimeException e) {
-            log.error("Unexpected error during replication", e);
-          } catch (Error e) {
-            log.error("Unexpected error during replication", e);
-          }
-        }
-      }, startDelay, TimeUnit.SECONDS);
-      active.put(uri, newOp);
-      e = newOp;
+      e = new PushOp(project.get(), srcConf, uri);
+      WorkQueue.schedule(e, startDelay, TimeUnit.SECONDS);
+      pending.put(uri, e);
     }
-    e.delta.add(ref);
+    e.addRef(ref);
   }
 
-  private static void pushImpl(final PushOp op) {
-    removeFromActive(op);
-    final Repository db;
-    try {
-      db = GerritServer.getInstance().getRepositoryCache().get(op.projectName);
-    } catch (OrmException e) {
-      log.error("Cannot open repository cache", e);
-      return;
-    } catch (XsrfException e) {
-      log.error("Cannot open repository cache", e);
-      return;
-    } catch (InvalidRepositoryException e) {
-      log.error("Cannot replicate " + op.projectName, e);
-      return;
-    }
-
-    final ArrayList<RemoteRefUpdate> cmds = new ArrayList<RemoteRefUpdate>();
-    try {
-      for (final String ref : op.delta) {
-        final String src = ref;
-        RefSpec spec = null;
-        for (final RefSpec s : op.config.getPushRefSpecs()) {
-          if (s.matchSource(src)) {
-            spec = s.expandFromSource(src);
-            break;
-          }
-        }
-        if (spec == null) {
-          continue;
-        }
-
-        // If the ref still exists locally, send it, else delete it.
-        //
-        final String srcexp = db.resolve(src) != null ? src : null;
-        final String dst = spec.getDestination();
-        final boolean force = spec.isForceUpdate();
-        cmds.add(new RemoteRefUpdate(db, srcexp, dst, force, null, null));
-      }
-    } catch (IOException e) {
-      log.error("Cannot replicate " + op.projectName, e);
-      return;
-    }
-
-    final Transport tn;
-    try {
-      tn = Transport.open(db, op.uri);
-      tn.applyConfig(op.config);
-    } catch (NotSupportedException e) {
-      log.error("Cannot replicate to " + op.uri, e);
-      return;
-    }
-
-    final PushResult res;
-    try {
-      res = tn.push(NullProgressMonitor.INSTANCE, cmds);
-    } catch (NoRemoteRepositoryException e) {
-      log.error("Cannot replicate to " + op.uri + "; repository not found");
-      return;
-    } catch (NotSupportedException e) {
-      log.error("Cannot replicate to " + op.uri, e);
-      return;
-    } catch (TransportException e) {
-      final Throwable cause = e.getCause();
-      if (cause instanceof JSchException
-          && cause.getMessage().startsWith("UnknownHostKey:")) {
-        log.error("Cannot replicate to " + op.uri + ": " + cause.getMessage());
-        return;
-      }
-      log.error("Cannot replicate to " + op.uri, e);
-      return;
-    } finally {
-      try {
-        tn.close();
-      } catch (Throwable e2) {
-        log.warn("Unexpected error while closing " + op.uri, e2);
-      }
-    }
-
-    for (final RemoteRefUpdate u : res.getRemoteUpdates()) {
-      switch (u.getStatus()) {
-        case OK:
-        case UP_TO_DATE:
-        case NON_EXISTING:
-          break;
-
-        case NOT_ATTEMPTED:
-        case AWAITING_REPORT:
-        case REJECTED_NODELETE:
-        case REJECTED_NONFASTFORWARD:
-        case REJECTED_REMOTE_CHANGED:
-          log.error("Failed replicate of " + u.getRemoteName() + " to "
-              + op.uri + ": status " + u.getStatus().name());
-          break;
-
-        case REJECTED_OTHER_REASON:
-          log.error("Failed replicate of " + u.getRemoteName() + " to "
-              + op.uri + ", reason: " + u.getMessage());
-          break;
-      }
-    }
-  }
-
-  private static synchronized void removeFromActive(final PushOp op) {
-    active.remove(op.uri);
+  static synchronized void notifyStarting(final PushOp op) {
+    pending.remove(op.getURI());
   }
 
   private static String replace(final String pat, final String key,
@@ -282,17 +158,4 @@
     }
     return configs;
   }
-
-  private static class PushOp {
-    final Set<String> delta = new HashSet<String>();
-    final String projectName;
-    final RemoteConfig config;
-    final URIish uri;
-
-    PushOp(final String d, final RemoteConfig c, final URIish u) {
-      projectName = d;
-      config = c;
-      uri = u;
-    }
-  }
 }