Log publishing of stream events in message_log file

Currently, when the plugin publishes a stream event to Kafka, the
message_log file is not updated. This is in contrast with other types
of events, which are logged in the file upon successful publishing of
the messages.

Bring the logging on stream events publishing in-line with other gerrit
events. Upon successful publishing of the event, update the message_log
file with the direction, topic name and message payload.

Bug: Issue 294904654
Change-Id: I611beca696d4cdb6951b6ac15afab1a391eb446e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index a0df313..5c9fb22 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -39,16 +39,19 @@
   private final KafkaProperties properties;
   private final Provider<Producer<String, String>> producerProvider;
   private final KafkaEventsPublisherMetrics publisherMetrics;
+  private final Log4JKafkaMessageLogger msgLog;
   private volatile Producer<String, String> producer;
 
   @Inject
   public KafkaSession(
       Provider<Producer<String, String>> producerProvider,
       KafkaProperties properties,
-      KafkaEventsPublisherMetrics publisherMetrics) {
+      KafkaEventsPublisherMetrics publisherMetrics,
+      Log4JKafkaMessageLogger msgLog) {
     this.producerProvider = producerProvider;
     this.properties = properties;
     this.publisherMetrics = publisherMetrics;
+    this.msgLog = msgLog;
   }
 
   public boolean isOpen() {
@@ -137,6 +140,7 @@
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
       publisherMetrics.incrementBrokerPublishedMessage();
+      msgLog.log(topic, messageBody);
       resultF.set(true);
       return resultF;
     } catch (Throwable e) {
@@ -154,6 +158,7 @@
               (metadata, e) -> {
                 if (metadata != null && e == null) {
                   LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+                  msgLog.log(topic, messageBody);
                   publisherMetrics.incrementBrokerPublishedMessage();
                 } else {
                   LOGGER.error("Cannot send the message", e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
new file mode 100644
index 0000000..aedb670
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class Log4JKafkaMessageLogger extends PluginLogFile {
+  private static final String LOG_NAME = "message_log";
+  private final Logger msgLog;
+
+  @Inject
+  public Log4JKafkaMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+    super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+    this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+  }
+
+  public void log(String topic, String event) {
+    msgLog.info("PUBLISH {} {}", topic, event);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 3f83e92..ee528a1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -43,6 +43,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -116,6 +117,8 @@
           .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
 
       bind(KafkaProperties.class).toInstance(kafkaProperties);
+      bind(Log4JKafkaMessageLogger.class)
+          .toInstance(mock(Log4JKafkaMessageLogger.class, Answers.RETURNS_DEEP_STUBS));
       bind(KafkaSession.class).in(Scopes.SINGLETON);
 
       bindKafkaClientImpl();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 59e7ca2..361de58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -25,6 +25,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -44,6 +45,8 @@
   @Mock KafkaProducerProvider producerProvider;
   @Mock KafkaProperties properties;
   @Mock KafkaEventsPublisherMetrics publisherMetrics;
+
+  @Mock Log4JKafkaMessageLogger msgLog;
   @Captor ArgumentCaptor<Callback> callbackCaptor;
 
   RecordMetadata recordMetadata;
@@ -59,7 +62,7 @@
 
     recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
 
-    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics, msgLog);
   }
 
   @Test
@@ -72,6 +75,15 @@
   }
 
   @Test
+  public void shouldUpdateMessageLogFileWhenMessagePublishedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+    objectUnderTest.connect();
+    objectUnderTest.publish(message);
+    verify(msgLog).log(topic, message);
+  }
+
+  @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
@@ -107,6 +119,19 @@
   }
 
   @Test
+  public void shouldUpdateMessageLogFileWhenMessagePublishedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+    objectUnderTest.connect();
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(recordMetadata, null);
+    verify(msgLog).log(topic, message);
+  }
+
+  @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
     when(properties.isSendAsync()).thenReturn(true);
     when(kafkaProducer.send(any(), any()))