Use a stream based approach to walk directories

This approach allows solutions to be developed which operate on events
without waiting for the entire listings to be complete.

Change-Id: Ic6e07210d4e319781e70c5f692cfea9c64315f3b
diff --git a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
index cf34073..fe5e7cb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/replication/ReplicationTasksStorage.java
@@ -24,15 +24,15 @@
 import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 import java.io.IOException;
-import java.nio.file.DirectoryIteratorException;
-import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.NotDirectoryException;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.eclipse.jgit.lib.ObjectId;
 import org.eclipse.jgit.transport.URIish;
 
@@ -71,7 +71,7 @@
       } catch (NoSuchFileException e) {
         logger.atFine().log("File %s not found while reading task", file);
       } catch (IOException e) {
-        if (!e.getMessage().equals("Is a directory")) {
+        if (!e.getMessage().contains("not a regular file")) {
           logger.atSevere().withCause(e).log("Error while reading task %s", file);
         }
       }
@@ -161,26 +161,26 @@
     return list(createDir(runningUpdates));
   }
 
-  private List<ReplicateRefUpdate> list(Path tasks) {
-    List<ReplicateRefUpdate> results = new ArrayList<>();
-    try (DirectoryStream<Path> events = Files.newDirectoryStream(tasks)) {
-      for (Path path : events) {
-        Optional<ReplicateRefUpdate> update = ReplicateRefUpdate.createOptionally(path);
-        if (update.isPresent()) {
-          results.add(update.get());
-        } else if (Files.isDirectory(path)) {
-          try {
-            results.addAll(list(path));
-          } catch (DirectoryIteratorException d) {
-            // iterating over the sub-directories is expected to have dirs disappear
-            Nfs.throwIfNotStaleFileHandle(d.getCause());
-          }
-        }
-      }
-    } catch (IOException e) {
-      logger.atSevere().withCause(e).log("Error while listing tasks");
+  private List<ReplicateRefUpdate> list(Path taskDir) {
+    return streamRecursive(taskDir).collect(Collectors.toList());
+  }
+
+  private Stream<ReplicateRefUpdate> streamRecursive(Path dir) {
+    return walk(dir)
+        .map(path -> ReplicateRefUpdate.createOptionally(path))
+        .filter(Optional::isPresent)
+        .map(Optional::get);
+  }
+
+  private static Stream<Path> walk(Path path) {
+    try {
+      return Stream.concat(Stream.of(path), Files.list(path).flatMap(sub -> walk(sub)));
+    } catch (NotDirectoryException e) {
+      return Stream.of(path);
+    } catch (Exception e) {
+      logger.atSevere().withCause(e).log("Error while walking directory %s", path);
+      return Stream.empty();
     }
-    return results;
   }
 
   @SuppressWarnings("deprecation")