Retry deserialization of events from queue

With this we do not have to rename plugins to make
custom event types registered before deserialization.

Change-Id: Icd51322852d3507dc711d9c6b04b303a70fb8a30
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
index f9bb9ce..ac4a972 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -17,10 +17,16 @@
 import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
 
 import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.events.Event;
 import com.google.gerrit.server.events.EventGson;
 import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
@@ -28,6 +34,8 @@
 import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPSubscriberSession;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 @Singleton
 public class BrokerApiSubscribers {
@@ -67,7 +75,7 @@
               logger.atFiner().log(
                   "The RabbitMqBrokerApi consumed event from topic %s with data: %s",
                   topic, messageBody);
-              Event event = gson.fromJson(messageBody, Event.class);
+              Event event = deserializeWithRetry(messageBody);
               if (event.type != null) {
                 topicSubscriber.consumer().accept(event);
               } else {
@@ -81,6 +89,39 @@
     return false;
   }
 
+  private Event deserializeWithRetry(String messageBody) {
+    int timeout = 5;
+    int retryTime = 5;
+    Retryer<Event> retryer =
+        RetryerBuilder.<Event>newBuilder()
+            .retryIfException()
+            .withWaitStrategy(WaitStrategies.fixedWait(timeout, TimeUnit.SECONDS))
+            .withStopStrategy(StopStrategies.stopAfterDelay(retryTime, TimeUnit.MINUTES))
+            .build();
+    try {
+      return retryer.call(
+          () -> {
+            try {
+              // May fail if not all plugins have registered their event types yet
+              return gson.fromJson(messageBody, Event.class);
+            } catch (JsonParseException e) {
+              logger.atWarning().withCause(e).log(
+                  "Deserializing json failed. Will retry again after %d seconds", timeout);
+              throw e;
+            }
+          });
+    } catch (RetryException e) {
+      logger.atSevere().withCause(e).log(
+          "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a plugin missing or failing to load.",
+          messageBody, retryTime);
+      return null;
+    } catch (ExecutionException e) {
+      // This should not happen
+      logger.atSevere().withCause(e).log("Retrying of json deserializing failed unexpectedly");
+      return null;
+    }
+  }
+
   public boolean removeSubscriber(TopicSubscriber topicSubscriber) {
     String consumerTag = consumerTags.remove(topicSubscriber);
     if (consumerTag == null) {
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
index 1153194..fe6b6de 100644
--- a/src/main/resources/Documentation/build.md
+++ b/src/main/resources/Documentation/build.md
@@ -36,15 +36,3 @@
 ```
   ./tools/eclipse/project.py
 ```
-
-Use RabbitMQBroker Api with multi-site or another setup with custom Gerrit events
-------------------------------------------------------------------------------------
-
-To make events-rabbitmq able to deserialize events from the rabbitMQ queues, every event type needs
-to be registered before. This means that every plugin that needs to register its own event types
-needs to load before events-rabbimq. Gerrit load plugins lexicographically based on the names of
-the jars of the plugins. So in the case of multi-site you could rename replication.jar to
-0-replication.jar. Do not forget to re-point any symlinks.
-```
-  mv gerrit/plugins/replication.jar gerrit/plugins/0-replication.jar
-```