Filter out erroneous change during auto-reindex

The auto-reindex thread scans for all changes of all projects
to find out the ones that need reindexing. Avoid aborting the
whole process when one of them fails to be loaded.

Also, tidy up the readability of the whole scanning pipeline
by removing redundant data types declarations.

Change-Id: Iabbfcaca25750bbff745df9b3bbb04ad36fcbdd8
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
index 0787953..fcd345d 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnable.java
@@ -23,7 +23,6 @@
 import com.google.gerrit.entities.Project;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.notedb.ChangeNotes;
-import com.google.gerrit.server.notedb.ChangeNotes.Factory.ChangeNotesResult;
 import com.google.gerrit.server.project.ProjectCache;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
@@ -83,7 +82,16 @@
         Stream<Change> projectChangesStream =
             notesFactory
                 .scan(repo, projectName)
-                .map((ChangeNotesResult changeNotes) -> changeNotes.notes().getChange());
+                .filter(
+                    cnr -> {
+                      if (cnr.error().isEmpty()) {
+                        return true;
+                      }
+                      log.atWarning().withCause(cnr.error().get()).log(
+                          "Error fetching change " + cnr.id());
+                      return false;
+                    })
+                .map(cnr -> cnr.notes().getChange());
         allChangesStream = Streams.concat(allChangesStream, projectChangesStream);
       }
     }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnableTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnableTest.java
index d0086d5..65001de 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnableTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/autoreindex/ChangeReindexRunnableTest.java
@@ -16,6 +16,7 @@
 
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -104,6 +105,31 @@
   }
 
   @Test
+  public void changeIsIndexedDuringRun_withInvalidExtraChange() throws Exception {
+    LocalDateTime currentTime = LocalDateTime.now(ZoneOffset.UTC);
+    Timestamp afterCurrentTime =
+        new Timestamp(currentTime.toEpochSecond(ZoneOffset.UTC) * 1000 + 1000L);
+    Change change = newChange(afterCurrentTime);
+
+    when(indexTs.getUpdateTs(AbstractIndexRestApiServlet.IndexName.CHANGE))
+        .thenReturn(Optional.of(currentTime));
+    when(projectCache.all()).thenReturn(ImmutableSortedSet.of(change.getProject()));
+    when(repoManager.openRepository(change.getProject())).thenReturn(repo);
+    ChangeNotesResult invalidChangeRes = mock(ChangeNotesResult.class);
+    lenient().when(invalidChangeRes.notes()).thenThrow(IllegalStateException.class);
+    when(invalidChangeRes.error()).thenReturn(Optional.of(new IllegalStateException()));
+    when(changeNotesFactory.scan(repo, change.getProject()))
+        .thenReturn(Stream.of(invalidChangeRes, changeNotesRes));
+    when(changeNotesRes.error()).thenReturn(Optional.empty());
+    when(changeNotesRes.notes()).thenReturn(changeNotes);
+    when(changeNotes.getChange()).thenReturn(change);
+
+    changeReindexRunnable.run();
+
+    verify(indexer).index(changeProjectIndexKey(change), Operation.INDEX, Optional.empty());
+  }
+
+  @Test
   public void groupIsNotIndexedWhenItIsCreatedBeforeLastGroupReindex() throws Exception {
     Timestamp currentTime = Timestamp.valueOf(LocalDateTime.now());
     Timestamp beforeCurrentTime = new Timestamp(currentTime.getTime() - 1000L);