Retry to publish events indefinitely
When the plugin receives a ShutdownSignalException due to flaky LDAP
connections returning 403 Forbidden, it should not stop attempting to
publish events. Instead enter a 30s retry loop.
This will improve resiliency to network shakiness.
Solves: Jira GER-2002
Change-Id: Ia723c30099f17568a534f11b120088b14fe00a8d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
index 18e7434..418e2b4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/eventseiffel/PublishEventWorker.java
@@ -14,11 +14,17 @@
package com.googlesource.gerrit.plugins.eventseiffel;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.WaitStrategies;
import com.google.common.flogger.FluentLogger;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.api.EiffelEventPublisher;
import com.googlesource.gerrit.plugins.eventseiffel.eiffel.dto.EiffelEvent;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class PublishEventWorker implements Runnable, EiffelEventHub.Consumer {
static final String THREAD_NAME = "eiffel-event-publishing";
@@ -43,7 +49,7 @@
events = eventQueue.take(publisher.maxBatchSize());
boolean unpublishedEvents = events != null && !events.isEmpty();
while (running && unpublishedEvents) {
- unpublishedEvents = !publisher.publish(events);
+ unpublishedEvents = !publishWithRetry(events);
if (!unpublishedEvents) {
eventQueue.ack(events);
events = null;
@@ -53,6 +59,9 @@
}
}
}
+ } catch (RetryException | ExecutionException e) {
+ logger.atSevere().withCause(e).log("Publishing failed even after retries.");
+ running = false;
} catch (InterruptedException e) {
if (running) {
logger.atSevere().withCause(e).log("Publisher thread interrupted while running.");
@@ -67,6 +76,29 @@
}
}
+ private boolean publishWithRetry(Set<EiffelEvent> events)
+ throws RetryException, ExecutionException {
+ int timeout = 30;
+ Retryer<Boolean> retryer =
+ RetryerBuilder.<Boolean>newBuilder()
+ .retryIfException()
+ .withWaitStrategy(WaitStrategies.fixedWait(timeout, TimeUnit.SECONDS))
+ .build();
+ return retryer.call(
+ () -> {
+ try {
+ return publisher.publish(events);
+ } catch (Exception e) {
+ logger.atWarning().withCause(e).log(
+ "Publishing event failed. Will retry again after %d seconds", timeout);
+ if (running) {
+ throw e;
+ }
+ return false;
+ }
+ });
+ }
+
@Override
public void start(EiffelEventQueue queue) {
if (thread != null && thread.isAlive()) {