Avoid race condition between indexing threads
When more than one thread try to index a change in the target instance,
it could be possible that, because of a race condition, the last thread
writes an outdated status to the index.
Synchronize the threads indexing the same change while still allowing
parallelization between threads indexing different changes.
Change-Id: Ic47c9d220f124c096e5e66bb46d13a9a7b0f51e7
diff --git a/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
index 2b56bf6..0a7eb2c 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/syncindex/SyncIndexRestApiServlet.java
@@ -21,7 +21,6 @@
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.index.change.ChangeIndexer;
-import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
@@ -31,6 +30,9 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
@@ -42,6 +44,8 @@
private static final long serialVersionUID = -1L;
private static final Logger logger =
LoggerFactory.getLogger(SyncIndexRestApiServlet.class);
+ private static final Map<Change.Id, AtomicInteger> changeIdLocks =
+ new HashMap<>();
private final ChangeIndexer indexer;
private final SchemaFactory<ReviewDb> schemaFactory;
@@ -56,50 +60,79 @@
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse rsp)
throws IOException, ServletException {
- rsp.setContentType("text/plain");
- rsp.setCharacterEncoding("UTF-8");
- Change.Id id = getIdFromRequest(req.getPathInfo());
-
- try (ReviewDb db = schemaFactory.open()) {
- Context.setForwardedEvent(true);
- Change change = db.changes().get(id);
- if (change == null) {
- throw new NoSuchChangeException(id);
- }
- indexer.index(db, change);
- rsp.setStatus(SC_NO_CONTENT);
- } catch (IOException e) {
- rsp.sendError(SC_CONFLICT, e.getMessage());
- logger.error("Unable to update index", e);
- } catch (OrmException | NoSuchChangeException e) {
- rsp.sendError(SC_NOT_FOUND, "Change not found\n");
- logger.debug("Error trying to find a change ", e);
- } finally {
- Context.unsetForwardedEvent();
- }
+ process(req, rsp, "index");
}
@Override
protected void doDelete(HttpServletRequest req, HttpServletResponse rsp)
throws IOException, ServletException {
+ process(req, rsp, "delete");
+ }
+
+ private void process(HttpServletRequest req, HttpServletResponse rsp,
+ String operation) throws IOException {
rsp.setContentType("text/plain");
rsp.setCharacterEncoding("UTF-8");
- Change.Id id = getIdFromRequest(req.getPathInfo());
-
+ String path = req.getPathInfo();
+ String changeId = path.substring(path.lastIndexOf('/') + 1);
+ Change.Id id = Change.Id.parse(changeId);
try {
Context.setForwardedEvent(true);
- indexer.delete(id);
+ index(id, operation);
rsp.setStatus(SC_NO_CONTENT);
} catch (IOException e) {
rsp.sendError(SC_CONFLICT, e.getMessage());
logger.error("Unable to update index", e);
+ } catch (OrmException e) {
+ String msg = "Error trying to find a change \n";
+ rsp.sendError(SC_NOT_FOUND, msg);
+ logger.debug(msg, e);
} finally {
Context.unsetForwardedEvent();
}
}
- private Change.Id getIdFromRequest(String path) {
- String changeId = path.substring(path.lastIndexOf('/') + 1);
- return Change.Id.parse(changeId);
+ private void index(Change.Id id, String operation)
+ throws IOException, OrmException {
+ AtomicInteger changeIdLock = getAndIncrementChangeIdLock(id);
+ synchronized (changeIdLock) {
+ if ("index".equals(operation)) {
+ try (ReviewDb db = schemaFactory.open()) {
+ Change change = db.changes().get(id);
+ if (change == null) {
+ indexer.delete(id);
+ return;
+ }
+ indexer.index(db, change);
+ }
+ logger.debug("Change {} successfully indexed", id);
+ }
+ if ("delete".equals(operation)) {
+ indexer.delete(id);
+ logger.debug("Change {} successfully deleted from index", id);
+ }
+ }
+ if (changeIdLock.decrementAndGet() == 0) {
+ removeChangeIdLock(id);
+ }
+ }
+
+ private AtomicInteger getAndIncrementChangeIdLock(Change.Id id) {
+ synchronized (changeIdLocks) {
+ AtomicInteger changeIdLock = changeIdLocks.get(id);
+ if (changeIdLock == null) {
+ changeIdLock = new AtomicInteger(1);
+ changeIdLocks.put(id, changeIdLock);
+ } else {
+ changeIdLock.incrementAndGet();
+ }
+ return changeIdLock;
+ }
+ }
+
+ private void removeChangeIdLock(Change.Id id) {
+ synchronized (changeIdLocks) {
+ changeIdLocks.remove(id);
+ }
}
}