Merge branch 'stable-3.6' into stable-3.7
* stable-3.6:
Log publishing of stream events in message_log file
Change-Id: I166e67f7b626f5390c37b481ed0ef3cbe752db31
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index a0df313..5c9fb22 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -39,16 +39,19 @@
private final KafkaProperties properties;
private final Provider<Producer<String, String>> producerProvider;
private final KafkaEventsPublisherMetrics publisherMetrics;
+ private final Log4JKafkaMessageLogger msgLog;
private volatile Producer<String, String> producer;
@Inject
public KafkaSession(
Provider<Producer<String, String>> producerProvider,
KafkaProperties properties,
- KafkaEventsPublisherMetrics publisherMetrics) {
+ KafkaEventsPublisherMetrics publisherMetrics,
+ Log4JKafkaMessageLogger msgLog) {
this.producerProvider = producerProvider;
this.properties = properties;
this.publisherMetrics = publisherMetrics;
+ this.msgLog = msgLog;
}
public boolean isOpen() {
@@ -137,6 +140,7 @@
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
+ msgLog.log(topic, messageBody);
resultF.set(true);
return resultF;
} catch (Throwable e) {
@@ -154,6 +158,7 @@
(metadata, e) -> {
if (metadata != null && e == null) {
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
+ msgLog.log(topic, messageBody);
publisherMetrics.incrementBrokerPublishedMessage();
} else {
LOGGER.error("Cannot send the message", e);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
new file mode 100644
index 0000000..aedb670
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/Log4JKafkaMessageLogger.java
@@ -0,0 +1,40 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.kafka.session;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class Log4JKafkaMessageLogger extends PluginLogFile {
+ private static final String LOG_NAME = "message_log";
+ private final Logger msgLog;
+
+ @Inject
+ public Log4JKafkaMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+ super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+ this.msgLog = LoggerFactory.getLogger(LOG_NAME);
+ }
+
+ public void log(String topic, String event) {
+ msgLog.info("PUBLISH {} {}", topic, event);
+ }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
index 3f83e92..ee528a1 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -43,6 +43,7 @@
import com.googlesource.gerrit.plugins.kafka.config.KafkaSubscriberProperties;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -116,6 +117,8 @@
.toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS));
bind(KafkaProperties.class).toInstance(kafkaProperties);
+ bind(Log4JKafkaMessageLogger.class)
+ .toInstance(mock(Log4JKafkaMessageLogger.class, Answers.RETURNS_DEEP_STUBS));
bind(KafkaSession.class).in(Scopes.SINGLETON);
bindKafkaClientImpl();
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
index 59e7ca2..361de58 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaSessionTest.java
@@ -25,6 +25,7 @@
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties.ClientType;
import com.googlesource.gerrit.plugins.kafka.session.KafkaProducerProvider;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
+import com.googlesource.gerrit.plugins.kafka.session.Log4JKafkaMessageLogger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -44,6 +45,8 @@
@Mock KafkaProducerProvider producerProvider;
@Mock KafkaProperties properties;
@Mock KafkaEventsPublisherMetrics publisherMetrics;
+
+ @Mock Log4JKafkaMessageLogger msgLog;
@Captor ArgumentCaptor<Callback> callbackCaptor;
RecordMetadata recordMetadata;
@@ -59,7 +62,7 @@
recordMetadata = new RecordMetadata(new TopicPartition(topic, 0), 0L, 0L, 0L, 0L, 0, 0);
- objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics);
+ objectUnderTest = new KafkaSession(producerProvider, properties, publisherMetrics, msgLog);
}
@Test
@@ -72,6 +75,15 @@
}
@Test
+ public void shouldUpdateMessageLogFileWhenMessagePublishedInSyncMode() {
+ when(properties.isSendAsync()).thenReturn(false);
+ when(kafkaProducer.send(any())).thenReturn(Futures.immediateFuture(recordMetadata));
+ objectUnderTest.connect();
+ objectUnderTest.publish(message);
+ verify(msgLog).log(topic, message);
+ }
+
+ @Test
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInSyncMode() {
when(properties.isSendAsync()).thenReturn(false);
when(kafkaProducer.send(any())).thenReturn(Futures.immediateFailedFuture(new Exception()));
@@ -107,6 +119,19 @@
}
@Test
+ public void shouldUpdateMessageLogFileWhenMessagePublishedInAsyncMode() {
+ when(properties.isSendAsync()).thenReturn(true);
+ when(kafkaProducer.send(any(), any())).thenReturn(Futures.immediateFuture(recordMetadata));
+
+ objectUnderTest.connect();
+ objectUnderTest.publish(message);
+
+ verify(kafkaProducer).send(any(), callbackCaptor.capture());
+ callbackCaptor.getValue().onCompletion(recordMetadata, null);
+ verify(msgLog).log(topic, message);
+ }
+
+ @Test
public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublishingFailedInAsyncMode() {
when(properties.isSendAsync()).thenReturn(true);
when(kafkaProducer.send(any(), any()))