Log publishing of stream events in message_log file
Currently, when the plugin publishes a stream event to Pubsub, the
message_log file is not updated. This is in contrast with other types
of events, which are logged in the file upon successful publishing of
the messages.
Bring the logging on stream events publishing in-line with other gerrit
events. Upon successful publishing of the event, update the message_log
file with the direction, topic name and message payload.
Bug: Issue 294904654
Change-Id: I9ab8be51af067a832590805aa77c701add2c1be5
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/Log4jPubsubMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Log4jPubsubMessageLogger.java
new file mode 100644
index 0000000..ac3f984
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/Log4jPubsubMessageLogger.java
@@ -0,0 +1,38 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.pubsub;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Log4jPubsubMessageLogger extends PluginLogFile {
+ private static final String LOG_NAME = "message_log";
+ private final Logger msgLog;
+
+ @Inject
+ public Log4jPubsubMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+ super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+ this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+ }
+
+ public void log(String topic, String event) {
+ msgLog.info("PUBLISH {} {}", topic, event);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
index 2a030c9..29d4342 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisher.java
@@ -46,6 +46,7 @@
private final String topic;
private final Publisher publisher;
private final PubSubConfiguration pubSubProperties;
+ private final Log4jPubsubMessageLogger msgLog;
@Inject
public PubSubPublisher(
@@ -53,13 +54,15 @@
PublisherProvider publisherProvider,
@EventGson Gson gson,
PubSubPublisherMetrics publisherMetrics,
- @Assisted String topic)
+ @Assisted String topic,
+ Log4jPubsubMessageLogger msgLog)
throws IOException {
this.gson = gson;
this.publisherMetrics = publisherMetrics;
this.topic = topic;
this.publisher = publisherProvider.get(topic);
this.pubSubProperties = pubSubProperties;
+ this.msgLog = msgLog;
}
public ListenableFuture<Boolean> publish(Event event) {
@@ -92,6 +95,7 @@
messageId, topic, pubsubMessage.getData().toStringUtf8());
publisherMetrics.incrementSucceedToPublishMessage();
+ msgLog.log(topic, pubsubMessage.getData().toStringUtf8());
}
},
MoreExecutors.directExecutor());
diff --git a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
index d88f9d4..2c85652 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/pubsub/PubSubPublisherTest.java
@@ -39,6 +39,7 @@
@Mock PublisherProvider publisherProviderMock;
@Mock Publisher publisherMock;
@Mock PubSubPublisherMetrics pubSubPublisherMetricsMock;
+ @Mock Log4jPubsubMessageLogger msgLog;
private static final String TOPIC = "foo";
private static final Event eventMessage = new ProjectCreatedEvent();
@@ -52,7 +53,8 @@
publisherProviderMock,
OutputFormat.JSON_COMPACT.newGson(),
pubSubPublisherMetricsMock,
- TOPIC);
+ TOPIC,
+ msgLog);
}
@Test
@@ -73,4 +75,18 @@
verify(pubSubPublisherMetricsMock, only()).incrementSucceedToPublishMessage();
}
+
+ @Test
+ public void shouldUpdateMessageLogFileWhenAsyncPublishSucceeds() {
+ when(publisherMock.publish(any())).thenReturn(ApiFutures.immediateFuture("some-message-id"));
+
+ objectUnderTest.publish(eventMessage);
+
+ verify(msgLog)
+ .log(
+ TOPIC,
+ String.format(
+ "{\"type\":\"%s\",\"event_created_on\":%d}",
+ eventMessage.type, eventMessage.eventCreatedOn));
+ }
}