Merge branch 'stable-3.6' into stable-3.7

* stable-3.6:
  Log publishing of stream events in message_log file

Change-Id: I166e67f7b626f5390c37b481ed0ef3cbe752db31
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index a0df313..5c9fb22 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -39,16 +39,19 @@
   private final KafkaProperties properties;
   private final Provider<Producer<String, String>> producerProvider;
   private final KafkaEventsPublisherMetrics publisherMetrics;
+  private final Log4JKafkaMessageLogger msgLog;
   private volatile Producer<String, String> producer;
 
   @Inject
   public KafkaSession(
       Provider<Producer<String, String>> producerProvider,
       KafkaProperties properties,
-      KafkaEventsPublisherMetrics publisherMetrics) {
+      KafkaEventsPublisherMetrics publisherMetrics,
+      Log4JKafkaMessageLogger msgLog) {
     this.producerProvider = producerProvider;
     this.properties = properties;
     this.publisherMetrics = publisherMetrics;
+    this.msgLog = msgLog;
   }
 
   public boolean isOpen() {
@@ -137,6 +140,7 @@
       RecordMetadata metadata = future.get();
       LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
       publisherMetrics.incrementBrokerPublishedMessage();
+      msgLog.log(topic, messageBody);
       resultF.set(true);
       return resultF;
     } catch (Throwable e) {
@@ -154,6 +158,7 @@
               (metadata, e) -> {
                 if (metadata != null && e == null) {
                   LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+                  msgLog.log(topic, messageBody);
                   publisherMetrics.incrementBrokerPublishedMessage();
                 } else {
                   LOGGER.error("Cannot send the message", e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
new file mode 100644
index 0000000..aedb670
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class Log4JKafkaMessageLogger extends PluginLogFile {
+  private static final String LOG_NAME = "message_log";
+  private final Logger msgLog;
+
+  @Inject
+  public Log4JKafkaMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+    super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+    this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+  }
+
+  public void log(String topic, String event) {
+    msgLog.info("PUBLISH {} {}", topic, event);
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 3f83e92..ee528a1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -43,6 +43,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -116,6 +117,8 @@
           .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
 
       bind(KafkaProperties.class).toInstance(kafkaProperties);
+      bind(Log4JKafkaMessageLogger.class)
+          .toInstance(mock(Log4JKafkaMessageLogger.class, Answers.RETURNS_DEEP_STUBS));
       bind(KafkaSession.class).in(Scopes.SINGLETON);
 
       bindKafkaClientImpl();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 59e7ca2..361de58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -25,6 +25,7 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -44,6 +45,8 @@
   @Mock KafkaProducerProvider producerProvider;
   @Mock KafkaProperties properties;
   @Mock KafkaEventsPublisherMetrics publisherMetrics;
+
+  @Mock Log4JKafkaMessageLogger msgLog;
   @Captor ArgumentCaptor<Callback> callbackCaptor;
 
   RecordMetadata recordMetadata;
@@ -59,7 +62,7 @@
 
     recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
 
-    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+    objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics, msgLog);
   }
 
   @Test
@@ -72,6 +75,15 @@
   }
 
   @Test
+  public void shouldUpdateMessageLogFileWhenMessagePublishedInSyncMode() {
+    when(properties.isSendAsync()).thenReturn(false);
+    when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+    objectUnderTest.connect();
+    objectUnderTest.publish(message);
+    verify(msgLog).log(topic, message);
+  }
+
+  @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
     when(properties.isSendAsync()).thenReturn(false);
     when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
@@ -107,6 +119,19 @@
   }
 
   @Test
+  public void shouldUpdateMessageLogFileWhenMessagePublishedInAsyncMode() {
+    when(properties.isSendAsync()).thenReturn(true);
+    when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+    objectUnderTest.connect();
+    objectUnderTest.publish(message);
+
+    verify(kafkaProducer).send(any(), callbackCaptor.capture());
+    callbackCaptor.getValue().onCompletion(recordMetadata, null);
+    verify(msgLog).log(topic, message);
+  }
+
+  @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
     when(properties.isSendAsync()).thenReturn(true);
     when(kafkaProducer.send(any(), any()))