Fix deserialization of Event from raw message
diff --git a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
index 9d45d67..52cf721 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/nats/NatsConsumer.java
@@ -16,6 +16,8 @@
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.events.Event;
+import com.google.gerrit.server.events.EventGson;
+import com.google.gson.Gson;
import com.google.inject.Inject;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
@@ -53,20 +55,23 @@
private final Connection connection;
private final JetStream jetStream;
private final JetStreamManagement jetStreamManagement;
- private Dispatcher dispatcher;
private JetStreamSubscription subscription;
+ private final Dispatcher dispatcher;
+ private final Gson gson;
@Inject
public NatsConsumer(
Configuration config,
Connection connection,
JetStream jetStream,
- JetStreamManagement jetStreamManagement) {
+ JetStreamManagement jetStreamManagement,
+ @EventGson Gson gson) {
this.config = config;
this.connection = connection;
this.dispatcher = connection.createDispatcher();
this.jetStream = jetStream;
this.jetStreamManagement = jetStreamManagement;
+ this.gson = gson;
}
public void subscribe(String streamName, java.util.function.Consumer<Event> messageProcessor) {
@@ -112,7 +117,8 @@
@Override
public void onMessage(Message msg) throws InterruptedException {
- Event e = new Event(new String(msg.getData(), StandardCharsets.UTF_8)) {};
+ String json = new String(msg.getData(), StandardCharsets.UTF_8);
+ Event e = gson.fromJson(json, Event.class);
messageProcessor.accept(e);
msg.ack();
logger.atFine().log("NATS consumer - consumed and acked event '%s'", e);