Retry deserialization of events from queue
With this we do not have to rename plugins to make
custom event types registered before deserialization.
Change-Id: Icd51322852d3507dc711d9c6b04b303a70fb8a30
diff --git a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
index f9bb9ce..ac4a972 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/rabbitmq/message/BrokerApiSubscribers.java
@@ -17,10 +17,16 @@
import static com.gerritforge.gerrit.eventbroker.TopicSubscriber.topicSubscriber;
import com.gerritforge.gerrit.eventbroker.TopicSubscriber;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventGson;
import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.rabbitmq.config.Properties;
@@ -28,6 +34,8 @@
import com.googlesource.gerrit.plugins.rabbitmq.session.type.AMQPSubscriberSession;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
@Singleton
public class BrokerApiSubscribers {
@@ -67,7 +75,7 @@
logger.atFiner().log(
"The RabbitMqBrokerApi consumed event from topic %s with data: %s",
topic, messageBody);
- Event event = gson.fromJson(messageBody, Event.class);
+ Event event = deserializeWithRetry(messageBody);
if (event.type != null) {
topicSubscriber.consumer().accept(event);
} else {
@@ -81,6 +89,39 @@
return false;
}
+ private Event deserializeWithRetry(String messageBody) {
+ int timeout = 5;
+ int retryTime = 5;
+ Retryer<Event> retryer =
+ RetryerBuilder.<Event>newBuilder()
+ .retryIfException()
+ .withWaitStrategy(WaitStrategies.fixedWait(timeout, TimeUnit.SECONDS))
+ .withStopStrategy(StopStrategies.stopAfterDelay(retryTime, TimeUnit.MINUTES))
+ .build();
+ try {
+ return retryer.call(
+ () -> {
+ try {
+ // May fail if not all plugins have registered their event types yet
+ return gson.fromJson(messageBody, Event.class);
+ } catch (JsonParseException e) {
+ logger.atWarning().withCause(e).log(
+ "Deserializing json failed. Will retry again after %d seconds", timeout);
+ throw e;
+ }
+ });
+ } catch (RetryException e) {
+ logger.atSevere().withCause(e).log(
+ "Failed to deserialize event %s for %d minutes, stopping retries. This may be due to a plugin missing or failing to load.",
+ messageBody, retryTime);
+ return null;
+ } catch (ExecutionException e) {
+ // This should not happen
+ logger.atSevere().withCause(e).log("Retrying of json deserializing failed unexpectedly");
+ return null;
+ }
+ }
+
public boolean removeSubscriber(TopicSubscriber topicSubscriber) {
String consumerTag = consumerTags.remove(topicSubscriber);
if (consumerTag == null) {
diff --git a/src/main/resources/Documentation/build.md b/src/main/resources/Documentation/build.md
index 1153194..fe6b6de 100644
--- a/src/main/resources/Documentation/build.md
+++ b/src/main/resources/Documentation/build.md
@@ -36,15 +36,3 @@
```
./tools/eclipse/project.py
```
-
-Use RabbitMQBroker Api with multi-site or another setup with custom Gerrit events
-------------------------------------------------------------------------------------
-
-To make events-rabbitmq able to deserialize events from the rabbitMQ queues, every event type needs
-to be registered before. This means that every plugin that needs to register its own event types
-needs to load before events-rabbimq. Gerrit load plugins lexicographically based on the names of
-the jars of the plugins. So in the case of multi-site you could rename replication.jar to
-0-replication.jar. Do not forget to re-point any symlinks.
-```
- mv gerrit/plugins/replication.jar gerrit/plugins/0-replication.jar
-```