Retry change reindex because of NFS access caching

Any edit of ChangeNotes generate loose objects and refs in the underlying
Git repo, which is then subject to delays caused by NFS attribute caching.

As a consequence, the reindexing events between nodes won't be effective
because the secondary node may not see the updated refs, even if the
files are effectively accessible and modified.

Allow configuring an automatic retry mechanism for the forwarded node
to detect and reschedule the indexing of stale changes.

Detection of a stale change relies on:
- Last update of the Change Notes (either ReviewDb or NoteDb)
- Maximum of the draft change messages timestamp
- Commit SHA of the target branch of the Change

It is explicitly out of scope the detection of the manual reindexing
of changes.

Bug: Issue 9078
Change-Id: I46a2b8390a79b79e7d877a27743b673325e75da5
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
index d4c1272..edd96d7 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/Configuration.java
@@ -51,6 +51,8 @@
 
   // common parameters to cache and index sections
   static final String THREAD_POOL_SIZE_KEY = "threadPoolSize";
+  static final int DEFAULT_INDEX_MAX_TRIES = 2;
+  static final int DEFAULT_INDEX_RETRY_INTERVAL = 30000;
   static final int DEFAULT_THREAD_POOL_SIZE = 4;
   static final String NUM_STRIPED_LOCKS = "numStripedLocks";
   static final int DEFAULT_NUM_STRIPED_LOCKS = 10;
@@ -446,8 +448,12 @@
 
   public static class Index extends Forwarding {
     static final String INDEX_SECTION = "index";
+    static final String MAX_TRIES_KEY = "maxTries";
+    static final String RETRY_INTERVAL_KEY = "retryInterval";
 
     private final int threadPoolSize;
+    private final int retryInterval;
+    private final int maxTries;
 
     private final int numStripedLocks;
 
@@ -455,6 +461,8 @@
       super(cfg, INDEX_SECTION);
       threadPoolSize = getInt(cfg, INDEX_SECTION, THREAD_POOL_SIZE_KEY, DEFAULT_THREAD_POOL_SIZE);
       numStripedLocks = getInt(cfg, INDEX_SECTION, NUM_STRIPED_LOCKS, DEFAULT_NUM_STRIPED_LOCKS);
+      retryInterval = getInt(cfg, INDEX_SECTION, RETRY_INTERVAL_KEY, DEFAULT_INDEX_RETRY_INTERVAL);
+      maxTries = getInt(cfg, INDEX_SECTION, MAX_TRIES_KEY, DEFAULT_INDEX_MAX_TRIES);
     }
 
     public int threadPoolSize() {
@@ -464,6 +472,14 @@
     public int numStripedLocks() {
       return numStripedLocks;
     }
+
+    public int retryInterval() {
+      return retryInterval;
+    }
+
+    public int maxTries() {
+      return maxTries;
+    }
   }
 
   public static class Websession extends Forwarding {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
index 597034d..3d3212b 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/ExecutorProvider.java
@@ -17,11 +17,11 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.inject.Provider;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
-public abstract class ExecutorProvider implements Provider<Executor>, LifecycleListener {
-  private ExecutorService executor;
+public abstract class ExecutorProvider
+    implements Provider<ScheduledExecutorService>, LifecycleListener {
+  private ScheduledExecutorService executor;
 
   protected ExecutorProvider(WorkQueue workQueue, int threadPoolSize, String threadNamePrefix) {
     executor = workQueue.createQueue(threadPoolSize, threadNamePrefix);
@@ -39,7 +39,7 @@
   }
 
   @Override
-  public Executor get() {
+  public ScheduledExecutorService get() {
     return executor;
   }
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
index ee986c5..f7ea962 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -15,6 +15,10 @@
 package com.ericsson.gerrit.plugins.highavailability.forwarder;
 
 import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.Configuration.Index;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
+import com.ericsson.gerrit.plugins.highavailability.index.ForwardedIndexExecutor;
 import com.google.common.base.Splitter;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.reviewdb.server.ReviewDb;
@@ -22,12 +26,16 @@
 import com.google.gerrit.server.index.change.ChangeIndexer;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.project.NoSuchChangeException;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gwtorm.server.OrmException;
 import com.google.gwtorm.server.SchemaFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Index a change using {@link ChangeIndexer}. This class is meant to be used on the receiving side
@@ -39,41 +47,106 @@
 public class ForwardedIndexChangeHandler extends ForwardedIndexingHandler<String> {
   private final ChangeIndexer indexer;
   private final SchemaFactory<ReviewDb> schemaFactory;
-  private final ChangeFinder changeFinder;
+  private final ScheduledExecutorService indexExecutor;
+  private final OneOffRequestContext oneOffCtx;
+  private final int retryInterval;
+  private final int maxTries;
+  private final ChangeCheckerImpl.Factory changeCheckerFactory;
 
   @Inject
   ForwardedIndexChangeHandler(
       ChangeIndexer indexer,
       SchemaFactory<ReviewDb> schemaFactory,
       ChangeFinder changeFinder,
-      Configuration config) {
+      Configuration config,
+      @ForwardedIndexExecutor ScheduledExecutorService indexExecutor,
+      OneOffRequestContext oneOffCtx,
+      ChangeCheckerImpl.Factory changeCheckerFactory) {
     super(config.index());
     this.indexer = indexer;
     this.schemaFactory = schemaFactory;
-    this.changeFinder = changeFinder;
+    this.indexExecutor = indexExecutor;
+    this.oneOffCtx = oneOffCtx;
+    this.changeCheckerFactory = changeCheckerFactory;
+
+    Index indexConfig = config.index();
+    this.retryInterval = indexConfig != null ? indexConfig.retryInterval() : 0;
+    this.maxTries = indexConfig != null ? indexConfig.maxTries() : 0;
   }
 
   @Override
   protected void doIndex(String id, Optional<IndexEvent> indexEvent)
       throws IOException, OrmException {
-    ChangeNotes change = null;
+    doIndex(id, indexEvent, 0);
+  }
+
+  private void doIndex(String id, Optional<IndexEvent> indexEvent, int retryCount)
+      throws IOException, OrmException {
     try (ReviewDb db = schemaFactory.open()) {
-      change = changeFinder.findOne(id);
-      if (change != null) {
-        change.reload();
-        indexer.index(db, change.getChange());
-        log.debug("Change {} successfully indexed", id);
+      ChangeChecker checker = changeCheckerFactory.create(id);
+      Optional<ChangeNotes> changeNotes = checker.getChangeNotes();
+      if (changeNotes.isPresent()) {
+        ChangeNotes notes = changeNotes.get();
+        notes.reload();
+        indexer.index(db, notes.getChange());
+
+        if (checker.isChangeUpToDate(indexEvent)) {
+          if (retryCount > 0) {
+            log.warn("Change {} has been eventually indexed after {} attempt(s)", id, retryCount);
+          } else {
+            log.debug("Change {} successfully indexed", id);
+          }
+        } else {
+          log.warn(
+              "Change {} seems too old compared to the event timestamp (event-Ts={} >> change-Ts={})",
+              id,
+              indexEvent,
+              checker);
+          rescheduleIndex(id, indexEvent, retryCount + 1);
+        }
+      } else {
+        indexer.delete(parseChangeId(id));
+        log.warn(
+            "Change {} could not be found in the local Git repository (eventTs={}), deleted from index",
+            id,
+            indexEvent);
       }
     } catch (Exception e) {
-      if (!isCausedByNoSuchChangeException(e)) {
-        throw e;
+      if (isCausedByNoSuchChangeException(e)) {
+        indexer.delete(parseChangeId(id));
+        log.warn("Error trying to index Change {}. Deleted from index", id, e);
+        return;
       }
-      log.debug("Change {} was deleted, aborting forwarded indexing the change.", id);
+
+      throw e;
     }
-    if (change == null) {
-      indexer.delete(parseChangeId(id));
-      log.debug("Change {} not found, deleted from index", id);
+  }
+
+  private void rescheduleIndex(String id, Optional<IndexEvent> indexEvent, int retryCount) {
+    if (retryCount > maxTries) {
+      log.error(
+          "Change {} could not be indexed after {} retries. Change index could be stale.",
+          id,
+          retryCount);
+      return;
     }
+
+    log.warn(
+        "Retrying for the #{} time to index Change {} after {} msecs",
+        retryCount,
+        id,
+        retryInterval);
+    indexExecutor.schedule(
+        () -> {
+          try (ManualRequestContext ctx = oneOffCtx.open()) {
+            Context.setForwardedEvent(true);
+            doIndex(id, indexEvent, retryCount);
+          } catch (Exception e) {
+            log.warn("Change {} could not be indexed", id, e);
+          }
+        },
+        retryInterval,
+        TimeUnit.MILLISECONDS);
   }
 
   @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
index f6f24a7..037c1c6 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
@@ -20,11 +20,15 @@
 
 public class IndexEvent {
   public long eventCreatedOn = System.currentTimeMillis() / 1000;
+  public String targetSha;
 
   @Override
   public String toString() {
-    return "IndexEvent@"
-        + LocalDateTime.ofEpochSecond(eventCreatedOn, 0, ZoneOffset.UTC)
-            .format(DateTimeFormatter.ISO_DATE_TIME);
+    return "IndexEvent@" + format(eventCreatedOn) + ((targetSha != null) ? "/" + targetSha : "");
+  }
+
+  public static String format(long eventTs) {
+    return LocalDateTime.ofEpochSecond(eventTs, 0, ZoneOffset.UTC)
+        .format(DateTimeFormatter.ISO_DATE_TIME);
   }
 }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java
new file mode 100644
index 0000000..9b2bddf
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java
@@ -0,0 +1,63 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gwtorm.server.OrmException;
+import java.io.IOException;
+import java.util.Optional;
+
+/** Encapsulates the logic of verifying the up-to-date status of a change. */
+public interface ChangeChecker {
+
+  /**
+   * Return the Change notes read from ReviewDb or NoteDb.
+   *
+   * @return notes of the Change
+   * @throws OrmException if ReviewDb or NoteDb cannot be opened
+   */
+  public Optional<ChangeNotes> getChangeNotes() throws OrmException;
+
+  /**
+   * Create a new index event POJO associated with the current Change.
+   *
+   * @return new IndexEvent
+   * @throws IOException if the current Change cannot read
+   * @throws OrmException if ReviewDb cannot be opened
+   */
+  public Optional<IndexEvent> newIndexEvent() throws IOException, OrmException;
+
+  /**
+   * Check if the local Change is aligned with the indexEvent received.
+   *
+   * @param indexEvent indexing event
+   * @return true if the local Change is up-to-date, false otherwise.
+   * @throws IOException if an I/O error occurred while reading the local Change
+   * @throws OrmException if the local ReviewDb cannot be opened
+   */
+  public boolean isChangeUpToDate(Optional<IndexEvent> indexEvent) throws IOException, OrmException;
+
+  /**
+   * Return the last computed up-to-date Change time-stamp.
+   *
+   * <p>Compute the up-to-date Change time-stamp when it is invoked for the very first time.
+   *
+   * @return the Change timestamp epoch in seconds
+   * @throws IOException if an I/O error occurred while reading the local Change
+   * @throws OrmException if the local ReviewDb cannot be opened
+   */
+  public Optional<Long> getComputedChangeTs() throws IOException, OrmException;
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
new file mode 100644
index 0000000..7b662dd
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
@@ -0,0 +1,168 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
+import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.Comment;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.ChangeFinder;
+import com.google.gerrit.server.CommentsUtil;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.util.ManualRequestContext;
+import com.google.gerrit.server.util.OneOffRequestContext;
+import com.google.gwtorm.server.OrmException;
+import com.google.gwtorm.server.SchemaFactory;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.Objects;
+import java.util.Optional;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.Repository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChangeCheckerImpl implements ChangeChecker {
+  private static final Logger log = LoggerFactory.getLogger(ChangeCheckerImpl.class);
+  private final GitRepositoryManager gitRepoMgr;
+  private final CommentsUtil commentsUtil;
+  private final SchemaFactory<ReviewDb> schemaFactory;
+  private final OneOffRequestContext oneOffReqCtx;
+  private final String changeId;
+  private final ChangeFinder changeFinder;
+  private Optional<Long> computedChangeTs = Optional.empty();
+  private Optional<ChangeNotes> changeNotes = Optional.empty();
+
+  public interface Factory {
+    public ChangeChecker create(String changeId);
+  }
+
+  @Inject
+  public ChangeCheckerImpl(
+      GitRepositoryManager gitRepoMgr,
+      CommentsUtil commentsUtil,
+      SchemaFactory<ReviewDb> schemaFactory,
+      ChangeFinder changeFinder,
+      OneOffRequestContext oneOffReqCtx,
+      @Assisted String changeId) {
+    this.changeFinder = changeFinder;
+    this.gitRepoMgr = gitRepoMgr;
+    this.commentsUtil = commentsUtil;
+    this.schemaFactory = schemaFactory;
+    this.oneOffReqCtx = oneOffReqCtx;
+    this.changeId = changeId;
+  }
+
+  @Override
+  public Optional<IndexEvent> newIndexEvent() throws IOException, OrmException {
+    return getComputedChangeTs()
+        .map(
+            ts -> {
+              IndexEvent event = new IndexEvent();
+              event.eventCreatedOn = ts;
+              event.targetSha = getBranchTargetSha();
+              return event;
+            });
+  }
+
+  private String getBranchTargetSha() {
+    try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) {
+      String refName = changeNotes.get().getChange().getDest().get();
+      Ref ref = repo.findRef(refName);
+      if (ref == null) {
+        log.warn("Unable to find target ref {} for change {}", refName, changeId);
+        return null;
+      }
+      return ref.getTarget().getObjectId().getName();
+    } catch (IOException e) {
+      log.warn("Unable to resolve target branch SHA for change {}", changeId, e);
+      return null;
+    }
+  }
+
+  @Override
+  public Optional<ChangeNotes> getChangeNotes() throws OrmException {
+    try (ManualRequestContext ctx = oneOffReqCtx.open()) {
+      changeNotes = Optional.ofNullable(changeFinder.findOne(changeId));
+      return changeNotes;
+    }
+  }
+
+  @Override
+  public boolean isChangeUpToDate(Optional<IndexEvent> indexEvent)
+      throws IOException, OrmException {
+    getComputedChangeTs();
+    log.debug("Checking change {} against index event {}", this, indexEvent);
+    if (!computedChangeTs.isPresent()) {
+      log.warn("Unable to compute last updated ts for change {}", changeId);
+      return false;
+    }
+
+    String targetSha = getBranchTargetSha();
+    return indexEvent
+        .map(
+            e ->
+                (computedChangeTs.get() > e.eventCreatedOn)
+                    || (computedChangeTs.get() == e.eventCreatedOn)
+                        && (Objects.equals(targetSha, e.targetSha)))
+        .orElse(true);
+  }
+
+  @Override
+  public Optional<Long> getComputedChangeTs() throws IOException, OrmException {
+    if (!computedChangeTs.isPresent()) {
+      computedChangeTs = computeLastChangeTs();
+    }
+    return computedChangeTs;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return "change-id="
+          + changeId
+          + "@"
+          + getComputedChangeTs().map(IndexEvent::format)
+          + "/"
+          + getBranchTargetSha();
+    } catch (IOException | OrmException e) {
+      log.error("Unable to render change {}", changeId, e);
+      return "change-id=" + changeId;
+    }
+  }
+
+  private Optional<Long> computeLastChangeTs() throws OrmException {
+    try (ReviewDb db = schemaFactory.open()) {
+      return getChangeNotes().map(notes -> getTsFromChangeAndDraftComments(db, notes));
+    }
+  }
+
+  private long getTsFromChangeAndDraftComments(ReviewDb db, ChangeNotes notes) {
+    Change change = notes.getChange();
+    Timestamp changeTs = change.getLastUpdatedOn();
+    try {
+      for (Comment comment : commentsUtil.draftByChange(db, changeNotes.get())) {
+        Timestamp commentTs = comment.writtenOn;
+        changeTs = commentTs.after(changeTs) ? commentTs : changeTs;
+      }
+    } catch (OrmException e) {
+      log.warn("Unable to access draft comments for change {}", change, e);
+    }
+    return changeTs.getTime() / 1000;
+  }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java
new file mode 100644
index 0000000..44c84dc
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutor.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+import java.lang.annotation.Retention;
+
+@Retention(RUNTIME)
+@BindingAnnotation
+public @interface ForwardedIndexExecutor {}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
new file mode 100644
index 0000000..2112dbe
--- /dev/null
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ForwardedIndexExecutorProvider.java
@@ -0,0 +1,30 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.ericsson.gerrit.plugins.highavailability.index;
+
+import com.ericsson.gerrit.plugins.highavailability.Configuration;
+import com.ericsson.gerrit.plugins.highavailability.ExecutorProvider;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+class ForwardedIndexExecutorProvider extends ExecutorProvider {
+
+  @Inject
+  ForwardedIndexExecutorProvider(WorkQueue workQueue, Configuration config) {
+    super(workQueue, config.index().threadPoolSize(), "Forwarded-Index-Event");
+  }
+}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
index 106422d..3010689 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandler.java
@@ -27,20 +27,28 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class IndexEventHandler
     implements ChangeIndexedListener, AccountIndexedListener, GroupIndexedListener {
+  private static final Logger log = LoggerFactory.getLogger(IndexEventHandler.class);
   private final Executor executor;
   private final Forwarder forwarder;
   private final String pluginName;
   private final Set<IndexTask> queuedTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
+  private final ChangeCheckerImpl.Factory changeChecker;
 
   @Inject
   IndexEventHandler(
-      @IndexExecutor Executor executor, @PluginName String pluginName, Forwarder forwarder) {
+      @IndexExecutor Executor executor,
+      @PluginName String pluginName,
+      Forwarder forwarder,
+      ChangeCheckerImpl.Factory changeChecker) {
     this.forwarder = forwarder;
     this.executor = executor;
     this.pluginName = pluginName;
+    this.changeChecker = changeChecker;
   }
 
   @Override
@@ -75,9 +83,19 @@
 
   private void executeIndexChangeTask(String projectName, int id, boolean deleted) {
     if (!Context.isForwardedEvent()) {
-      IndexChangeTask task = new IndexChangeTask(projectName, id, deleted);
-      if (queuedTasks.add(task)) {
-        executor.execute(task);
+      ChangeChecker checker = changeChecker.create(projectName + "~" + id);
+      try {
+        checker
+            .newIndexEvent()
+            .map(event -> new IndexChangeTask(projectName, id, deleted, event))
+            .ifPresent(
+                task -> {
+                  if (queuedTasks.add(task)) {
+                    executor.execute(task);
+                  }
+                });
+      } catch (Exception e) {
+        log.warn("Unable to create task to handle change {}~{}", projectName, id, e);
       }
     }
   }
@@ -89,6 +107,10 @@
       indexEvent = new IndexEvent();
     }
 
+    IndexTask(IndexEvent indexEvent) {
+      this.indexEvent = indexEvent;
+    }
+
     @Override
     public void run() {
       queuedTasks.remove(this);
@@ -103,7 +125,8 @@
     private final int changeId;
     private final String projectName;
 
-    IndexChangeTask(String projectName, int changeId, boolean deleted) {
+    IndexChangeTask(String projectName, int changeId, boolean deleted, IndexEvent indexEvent) {
+      super(indexEvent);
       this.projectName = projectName;
       this.changeId = changeId;
       this.deleted = deleted;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
index f88a806..ebf8fdf 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/IndexModule.java
@@ -19,16 +19,26 @@
 import com.google.gerrit.extensions.events.GroupIndexedListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 
 public class IndexModule extends LifecycleModule {
 
   @Override
   protected void configure() {
     bind(Executor.class).annotatedWith(IndexExecutor.class).toProvider(IndexExecutorProvider.class);
+    bind(ScheduledExecutorService.class)
+        .annotatedWith(ForwardedIndexExecutor.class)
+        .toProvider(ForwardedIndexExecutorProvider.class);
     listener().to(IndexExecutorProvider.class);
     DynamicSet.bind(binder(), ChangeIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), AccountIndexedListener.class).to(IndexEventHandler.class);
     DynamicSet.bind(binder(), GroupIndexedListener.class).to(IndexEventHandler.class);
+
+    install(
+        new FactoryModuleBuilder()
+            .implement(ChangeChecker.class, ChangeCheckerImpl.class)
+            .build(ChangeCheckerImpl.Factory.class));
   }
 }
diff --git a/src/main/resources/Documentation/config.md b/src/main/resources/Documentation/config.md
index ca5a6e5..04ccafc 100644
--- a/src/main/resources/Documentation/config.md
+++ b/src/main/resources/Documentation/config.md
@@ -195,6 +195,17 @@
 :   Maximum number of threads used to send index events to the target instance.
     Defaults to 4.
 
+```index.maxTries```
+:   Maximum number of times the plugin should attempt to reindex changes.
+    Setting this value to 0 will disable retries. After this number of failed tries,
+    an error is logged and the local index should be considered stale and needs
+    to be investigated and manually reindexed.
+    Defaults to 2.
+
+```index.retryInterval```
+:   The interval of time in milliseconds between the subsequent auto-retries.
+    Defaults to 30000 (30 seconds).
+
 ```websession.synchronize```
 :   Whether to synchronize web sessions.
     Defaults to true.
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
index d29589e..9ec0d3c 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandlerTest.java
@@ -25,17 +25,22 @@
 
 import com.ericsson.gerrit.plugins.highavailability.Configuration;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeChecker;
+import com.ericsson.gerrit.plugins.highavailability.index.ChangeCheckerImpl;
 import com.google.gerrit.common.TimeUtil;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.reviewdb.server.ReviewDb;
 import com.google.gerrit.server.ChangeFinder;
+import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.index.change.ChangeIndexer;
 import com.google.gerrit.server.notedb.ChangeNotes;
 import com.google.gerrit.server.project.NoSuchChangeException;
+import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.gwtorm.server.OrmException;
 import com.google.gwtorm.server.SchemaFactory;
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -57,15 +62,23 @@
   private static final boolean DO_NOT_THROW_ORM_EXCEPTION = false;
   private static final boolean THROW_IO_EXCEPTION = true;
   private static final boolean THROW_ORM_EXCEPTION = true;
+  private static final boolean CHANGE_UP_TO_DATE = true;
+  private static final boolean CHANGE_OUTDATED = false;
 
   @Rule public ExpectedException exception = ExpectedException.none();
   @Mock private ChangeIndexer indexerMock;
   @Mock private SchemaFactory<ReviewDb> schemaFactoryMock;
   @Mock private ReviewDb dbMock;
-  @Mock private ChangeFinder changeFinderMock;
   @Mock private ChangeNotes changeNotes;
   @Mock private Configuration configMock;
   @Mock private Configuration.Index indexMock;
+  @Mock private ScheduledExecutorService indexExecutorMock;
+  @Mock private OneOffRequestContext ctxMock;
+  @Mock private GitRepositoryManager gitRepoMgrMock;
+  @Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
+  @Mock private ChangeChecker changeCheckerAbsentMock;
+  @Mock private ChangeChecker changeCheckerPresentMock;
+  @Mock private ChangeFinder changeFinderMock;
   private ForwardedIndexChangeHandler handler;
   private Change.Id id;
   private Change change;
@@ -78,19 +91,33 @@
     when(changeNotes.getChange()).thenReturn(change);
     when(configMock.index()).thenReturn(indexMock);
     when(indexMock.numStripedLocks()).thenReturn(10);
+    when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerAbsentMock);
     handler =
         new ForwardedIndexChangeHandler(
-            indexerMock, schemaFactoryMock, changeFinderMock, configMock);
+            indexerMock,
+            schemaFactoryMock,
+            changeFinderMock,
+            configMock,
+            indexExecutorMock,
+            ctxMock,
+            changeCheckerFactoryMock);
   }
 
   @Test
-  public void changeIsIndexed() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+  public void changeIsIndexedWhenUpToDate() throws Exception {
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
     verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
   }
 
   @Test
+  public void changeIsStillIndexedEvenWhenOutdated() throws Exception {
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_OUTDATED);
+    handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.of(new IndexEvent()));
+    verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
+  }
+
+  @Test
   public void changeIsDeletedFromIndex() throws Exception {
     handler.index(TEST_CHANGE_ID, Operation.DELETE, Optional.empty());
     verify(indexerMock, times(1)).delete(id);
@@ -98,14 +125,14 @@
 
   @Test
   public void changeToIndexDoesNotExist() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_DOES_NOT_EXIST);
+    setupChangeAccessRelatedMocks(CHANGE_DOES_NOT_EXIST, CHANGE_OUTDATED);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
     verify(indexerMock, times(1)).delete(id);
   }
 
   @Test
   public void schemaThrowsExceptionWhenLookingUpForChange() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_ORM_EXCEPTION);
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, THROW_ORM_EXCEPTION, CHANGE_UP_TO_DATE);
     exception.expect(OrmException.class);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
   }
@@ -127,14 +154,15 @@
 
   @Test
   public void indexerThrowsIOExceptionTryingToIndexChange() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS, DO_NOT_THROW_ORM_EXCEPTION, THROW_IO_EXCEPTION);
+    setupChangeAccessRelatedMocks(
+        CHANGE_EXISTS, DO_NOT_THROW_ORM_EXCEPTION, THROW_IO_EXCEPTION, CHANGE_UP_TO_DATE);
     exception.expect(IOException.class);
     handler.index(TEST_CHANGE_ID, Operation.INDEX, Optional.empty());
   }
 
   @Test
   public void shouldSetAndUnsetForwardedContext() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
     // this doAnswer is to allow to assert that context is set to forwarded
     // while cache eviction is called.
     doAnswer(
@@ -155,7 +183,7 @@
 
   @Test
   public void shouldSetAndUnsetForwardedContextEvenIfExceptionIsThrown() throws Exception {
-    setupChangeAccessRelatedMocks(CHANGE_EXISTS);
+    setupChangeAccessRelatedMocks(CHANGE_EXISTS, CHANGE_UP_TO_DATE);
     doAnswer(
             (Answer<Void>)
                 invocation -> {
@@ -177,33 +205,37 @@
     verify(indexerMock, times(1)).index(any(ReviewDb.class), any(Change.class));
   }
 
-  private void setupChangeAccessRelatedMocks(boolean changeExist) throws Exception {
+  private void setupChangeAccessRelatedMocks(boolean changeExist, boolean changeUpToDate)
+      throws Exception {
     setupChangeAccessRelatedMocks(
-        changeExist, DO_NOT_THROW_ORM_EXCEPTION, DO_NOT_THROW_IO_EXCEPTION);
-  }
-
-  private void setupChangeAccessRelatedMocks(boolean changeExist, boolean ormException)
-      throws OrmException, IOException {
-    setupChangeAccessRelatedMocks(changeExist, ormException, DO_NOT_THROW_IO_EXCEPTION);
+        changeExist, DO_NOT_THROW_ORM_EXCEPTION, DO_NOT_THROW_IO_EXCEPTION, changeUpToDate);
   }
 
   private void setupChangeAccessRelatedMocks(
-      boolean changeExists, boolean ormException, boolean ioException)
+      boolean changeExist, boolean ormException, boolean changeUpToDate)
+      throws OrmException, IOException {
+    setupChangeAccessRelatedMocks(
+        changeExist, ormException, DO_NOT_THROW_IO_EXCEPTION, changeUpToDate);
+  }
+
+  private void setupChangeAccessRelatedMocks(
+      boolean changeExists, boolean ormException, boolean ioException, boolean changeIsUpToDate)
       throws OrmException, IOException {
     if (ormException) {
       doThrow(new OrmException("")).when(schemaFactoryMock).open();
     } else {
       when(schemaFactoryMock.open()).thenReturn(dbMock);
       if (changeExists) {
-        when(changeFinderMock.findOne(TEST_CHANGE_ID)).thenReturn(changeNotes);
+        when(changeCheckerFactoryMock.create(TEST_CHANGE_ID)).thenReturn(changeCheckerPresentMock);
+        when(changeCheckerPresentMock.getChangeNotes()).thenReturn(Optional.of(changeNotes));
         if (ioException) {
           doThrow(new IOException("io-error"))
               .when(indexerMock)
               .index(any(ReviewDb.class), any(Change.class));
         }
-      } else {
-        when(changeFinderMock.findOne(TEST_CHANGE_ID)).thenReturn(null);
       }
     }
+
+    when(changeCheckerPresentMock.isChangeUpToDate(any())).thenReturn(changeIsUpToDate);
   }
 }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
index 43c2fd7..3ec979c 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -21,9 +21,11 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
 
 import com.ericsson.gerrit.plugins.highavailability.forwarder.Context;
 import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder;
+import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent;
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexAccountTask;
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexChangeTask;
 import com.ericsson.gerrit.plugins.highavailability.index.IndexEventHandler.IndexGroupTask;
@@ -31,6 +33,7 @@
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.AccountGroup;
 import com.google.gerrit.reviewdb.client.Change;
+import java.util.Optional;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.junit.Before;
 import org.junit.Test;
@@ -49,17 +52,22 @@
 
   private IndexEventHandler indexEventHandler;
   @Mock private Forwarder forwarder;
+  @Mock private ChangeCheckerImpl.Factory changeCheckerFactoryMock;
+  @Mock private ChangeChecker changeCheckerMock;
   private Change.Id changeId;
   private Account.Id accountId;
   private AccountGroup.UUID accountGroupUUID;
 
   @Before
-  public void setUpMocks() {
+  public void setUpMocks() throws Exception {
     changeId = new Change.Id(CHANGE_ID);
     accountId = new Account.Id(ACCOUNT_ID);
     accountGroupUUID = new AccountGroup.UUID(UUID);
+    when(changeCheckerFactoryMock.create(any())).thenReturn(changeCheckerMock);
+    when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent()));
     indexEventHandler =
-        new IndexEventHandler(MoreExecutors.directExecutor(), PLUGIN_NAME, forwarder);
+        new IndexEventHandler(
+            MoreExecutors.directExecutor(), PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
   }
 
   @Test
@@ -116,17 +124,19 @@
   @Test
   public void duplicateChangeEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
-    indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+    indexEventHandler =
+        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
     indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
     indexEventHandler.onChangeIndexed(PROJECT_NAME, changeId.get());
     verify(poolMock, times(1))
-        .execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false));
+        .execute(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null));
   }
 
   @Test
   public void duplicateAccountEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
-    indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+    indexEventHandler =
+        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
     indexEventHandler.onAccountIndexed(accountId.get());
     indexEventHandler.onAccountIndexed(accountId.get());
     verify(poolMock, times(1)).execute(indexEventHandler.new IndexAccountTask(ACCOUNT_ID));
@@ -135,7 +145,8 @@
   @Test
   public void duplicateGroupEventOfAQueuedEventShouldGetDiscarded() {
     ScheduledThreadPoolExecutor poolMock = mock(ScheduledThreadPoolExecutor.class);
-    indexEventHandler = new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder);
+    indexEventHandler =
+        new IndexEventHandler(poolMock, PLUGIN_NAME, forwarder, changeCheckerFactoryMock);
     indexEventHandler.onGroupIndexed(accountGroupUUID.get());
     indexEventHandler.onGroupIndexed(accountGroupUUID.get());
     verify(poolMock, times(1)).execute(indexEventHandler.new IndexGroupTask(UUID));
@@ -143,7 +154,8 @@
 
   @Test
   public void testIndexChangeTaskToString() throws Exception {
-    IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+    IndexChangeTask task =
+        indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
     assertThat(task.toString())
         .isEqualTo(
             String.format("[%s] Index change %s in target instance", PLUGIN_NAME, CHANGE_ID));
@@ -166,29 +178,31 @@
 
   @Test
   public void testIndexChangeTaskHashCodeAndEquals() {
-    IndexChangeTask task = indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+    IndexChangeTask task =
+        indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
 
     IndexChangeTask sameTask = task;
     assertThat(task.equals(sameTask)).isTrue();
     assertThat(task.hashCode()).isEqualTo(sameTask.hashCode());
 
     IndexChangeTask identicalTask =
-        indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false);
+        indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID, false, null);
     assertThat(task.equals(identicalTask)).isTrue();
     assertThat(task.hashCode()).isEqualTo(identicalTask.hashCode());
 
     assertThat(task.equals(null)).isFalse();
     assertThat(
-            task.equals(indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, false)))
+            task.equals(
+                indexEventHandler.new IndexChangeTask(PROJECT_NAME, CHANGE_ID + 1, false, null)))
         .isFalse();
     assertThat(task.hashCode()).isNotEqualTo("test".hashCode());
 
     IndexChangeTask differentChangeIdTask =
-        indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, false);
+        indexEventHandler.new IndexChangeTask(PROJECT_NAME, 123, false, null);
     assertThat(task.equals(differentChangeIdTask)).isFalse();
     assertThat(task.hashCode()).isNotEqualTo(differentChangeIdTask.hashCode());
 
-    IndexChangeTask removeTask = indexEventHandler.new IndexChangeTask("", CHANGE_ID, true);
+    IndexChangeTask removeTask = indexEventHandler.new IndexChangeTask("", CHANGE_ID, true, null);
     assertThat(task.equals(removeTask)).isFalse();
     assertThat(task.hashCode()).isNotEqualTo(removeTask.hashCode());
   }