Avoid race condition between indexing threads

When more than one thread try to index a change in the target instance,
it could be possible that, because of a race condition, the last thread
writes an outdated status to the index.

Synchronize the threads indexing the same change while still allowing
parallelization between threads indexing different changes.

Change-Id: Ic47c9d220f124c096e5e66bb46d13a9a7b0f51e7
diff --git a/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
index 2b56bf6..0a7eb2c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
@@ -21,7 +21,6 @@
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.reviewdb.server.ReviewDb;
 import com.google.gerrit.server.index.change.ChangeIndexer;
-import com.google.gerrit.server.project.NoSuchChangeException;
 import com.google.gwtorm.server.OrmException;
 import com.google.gwtorm.server.SchemaFactory;
 import com.google.inject.Inject;
@@ -31,6 +30,9 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -42,6 +44,8 @@
   private static final long serialVersionUID = -1L;
   private static final Logger logger =
       LoggerFactory.getLogger(SyncIndexRestApiServlet.class);
+  private static final Map<Change.Id, AtomicInteger> changeIdLocks =
+      new HashMap<>();
 
   private final ChangeIndexer indexer;
   private final SchemaFactory<ReviewDb> schemaFactory;
@@ -56,50 +60,79 @@
   @Override
   protected void doPost(HttpServletRequest req, HttpServletResponse rsp)
       throws IOException, ServletException {
-    rsp.setContentType("text/plain");
-    rsp.setCharacterEncoding("UTF-8");
-    Change.Id id = getIdFromRequest(req.getPathInfo());
-
-    try (ReviewDb db = schemaFactory.open()) {
-      Context.setForwardedEvent(true);
-      Change change = db.changes().get(id);
-      if (change == null) {
-        throw new NoSuchChangeException(id);
-      }
-      indexer.index(db, change);
-      rsp.setStatus(SC_NO_CONTENT);
-    } catch (IOException e) {
-      rsp.sendError(SC_CONFLICT, e.getMessage());
-      logger.error("Unable to update index", e);
-    } catch (OrmException | NoSuchChangeException e) {
-      rsp.sendError(SC_NOT_FOUND, "Change not found\n");
-      logger.debug("Error trying to find a change ", e);
-    } finally {
-      Context.unsetForwardedEvent();
-    }
+    process(req, rsp, "index");
   }
 
   @Override
   protected void doDelete(HttpServletRequest req, HttpServletResponse rsp)
       throws IOException, ServletException {
+    process(req, rsp, "delete");
+  }
+
+  private void process(HttpServletRequest req, HttpServletResponse rsp,
+      String operation) throws IOException {
     rsp.setContentType("text/plain");
     rsp.setCharacterEncoding("UTF-8");
-    Change.Id id = getIdFromRequest(req.getPathInfo());
-
+    String path = req.getPathInfo();
+    String changeId = path.substring(path.lastIndexOf('/') + 1);
+    Change.Id id = Change.Id.parse(changeId);
     try {
       Context.setForwardedEvent(true);
-      indexer.delete(id);
+      index(id, operation);
       rsp.setStatus(SC_NO_CONTENT);
     } catch (IOException e) {
       rsp.sendError(SC_CONFLICT, e.getMessage());
       logger.error("Unable to update index", e);
+    } catch (OrmException e) {
+      String msg = "Error trying to find a change \n";
+      rsp.sendError(SC_NOT_FOUND, msg);
+      logger.debug(msg, e);
     } finally {
       Context.unsetForwardedEvent();
     }
   }
 
-  private Change.Id getIdFromRequest(String path) {
-    String changeId = path.substring(path.lastIndexOf('/') + 1);
-    return Change.Id.parse(changeId);
+  private void index(Change.Id id, String operation)
+      throws IOException, OrmException {
+    AtomicInteger changeIdLock = getAndIncrementChangeIdLock(id);
+    synchronized (changeIdLock) {
+      if ("index".equals(operation)) {
+        try (ReviewDb db = schemaFactory.open()) {
+          Change change = db.changes().get(id);
+          if (change == null) {
+            indexer.delete(id);
+            return;
+          }
+          indexer.index(db, change);
+        }
+        logger.debug("Change {} successfully indexed", id);
+      }
+      if ("delete".equals(operation)) {
+        indexer.delete(id);
+        logger.debug("Change {} successfully deleted from index", id);
+      }
+    }
+    if (changeIdLock.decrementAndGet() == 0) {
+      removeChangeIdLock(id);
+    }
+  }
+
+  private AtomicInteger getAndIncrementChangeIdLock(Change.Id id) {
+    synchronized (changeIdLocks) {
+      AtomicInteger changeIdLock = changeIdLocks.get(id);
+      if (changeIdLock == null) {
+        changeIdLock = new AtomicInteger(1);
+        changeIdLocks.put(id, changeIdLock);
+      } else {
+        changeIdLock.incrementAndGet();
+      }
+      return changeIdLock;
+    }
+  }
+
+  private void removeChangeIdLock(Change.Id id) {
+    synchronized (changeIdLocks) {
+      changeIdLocks.remove(id);
+    }
   }
 }