Retry to publish events indefinitely

When the plugin receives a ShutdownSignalException due to flaky LDAP
connections returning 403 Forbidden, it should not stop attempting to
publish events. Instead enter a 30s retry loop.

This will improve resiliency to network shakiness.

Solves: Jira GER-2002
Change-Id: Ia723c30099f17568a534f11b120088b14fe00a8d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
index 18e7434..418e2b4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
@@ -14,11 +14,17 @@
 
 package com.googlesource.gerrit.plugins.eventseiffel;
 
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.WaitStrategies;
 import com.google.common.flogger.FluentLogger;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.eventseiffel.eiffel.api.EiffelEventPublisher;
 import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class PublishEventWorker implements Runnable, EiffelEventHub.Consumer {
   static final String THREAD_NAME = "eiffel-event-publishing";
@@ -43,7 +49,7 @@
         events = eventQueue.take(publisher.maxBatchSize());
         boolean unpublishedEvents = events != null && !events.isEmpty();
         while (running && unpublishedEvents) {
-          unpublishedEvents = !publisher.publish(events);
+          unpublishedEvents = !publishWithRetry(events);
           if (!unpublishedEvents) {
             eventQueue.ack(events);
             events = null;
@@ -53,6 +59,9 @@
           }
         }
       }
+    } catch (RetryException | ExecutionException e) {
+      logger.atSevere().withCause(e).log("Publishing failed even after retries.");
+      running = false;
     } catch (InterruptedException e) {
       if (running) {
         logger.atSevere().withCause(e).log("Publisher thread interrupted while running.");
@@ -67,6 +76,29 @@
     }
   }
 
+  private boolean publishWithRetry(Set<EiffelEvent> events)
+      throws RetryException, ExecutionException {
+    int timeout = 30;
+    Retryer<Boolean> retryer =
+        RetryerBuilder.<Boolean>newBuilder()
+            .retryIfException()
+            .withWaitStrategy(WaitStrategies.fixedWait(timeout, TimeUnit.SECONDS))
+            .build();
+    return retryer.call(
+        () -> {
+          try {
+            return publisher.publish(events);
+          } catch (Exception e) {
+            logger.atWarning().withCause(e).log(
+                "Publishing event failed. Will retry again after %d seconds", timeout);
+            if (running) {
+              throw e;
+            }
+            return false;
+          }
+        });
+  }
+
   @Override
   public void start(EiffelEventQueue queue) {
     if (thread != null && thread.isAlive()) {