Log published/consumed messages in message_log

Produced and consumed events are appended to a new rotated message_log
file.

Note that broker connection information, errors, debugs and retries will
still go to the error_log file.

Bug: Issue 10577
Change-Id: I59d130037c82eb016088b1a3dbbf3a374d22e413
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java
new file mode 100644
index 0000000..258314f
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/DisabledMessageLogger.java
@@ -0,0 +1,23 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
+
+public class DisabledMessageLogger implements MessageLogger {
+
+  @Override
+  public void log(Direction direction, SourceAwareEventWrapper event) {}
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
new file mode 100644
index 0000000..3070c2a
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.google.gerrit.extensions.systemstatus.ServerInformation;
+import com.google.gerrit.server.util.PluginLogFile;
+import com.google.gerrit.server.util.SystemLog;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
+import org.apache.log4j.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class Log4jMessageLogger extends PluginLogFile implements MessageLogger {
+  private static final String LOG_NAME = "message_log";
+  private final Logger msgLog;
+
+  @Inject
+  public Log4jMessageLogger(SystemLog systemLog, ServerInformation serverInfo) {
+    super(systemLog, serverInfo, LOG_NAME, new PatternLayout("[%d{ISO8601}] [%t] %-5p : %m%n"));
+    msgLog = LoggerFactory.getLogger(LOG_NAME);
+  }
+
+  @Override
+  public void log(Direction direction, SourceAwareEventWrapper event) {
+    msgLog.info("{} Header[{}] Body[{}]", direction, event.getHeader(), event.getBody());
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
new file mode 100644
index 0000000..8c8d949
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -0,0 +1,27 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
+
+public interface MessageLogger {
+
+  public enum Direction {
+    PUBLISH,
+    CONSUME;
+  }
+
+  public void log(Direction direction, SourceAwareEventWrapper event);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0a0fe98..5b900c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -15,8 +15,8 @@
 package com.googlesource.gerrit.plugins.multisite;
 
 import com.google.gerrit.extensions.annotations.PluginData;
+import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
@@ -38,7 +38,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Module extends AbstractModule {
+public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private final Configuration config;
 
@@ -50,6 +50,9 @@
   @Override
   protected void configure() {
 
+    listener().to(Log4jMessageLogger.class);
+    bind(MessageLogger.class).to(Log4jMessageLogger.class);
+
     install(new ForwarderModule());
 
     if (config.cache().synchronize()) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index 11310b5..cce9cc5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -22,6 +22,8 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
@@ -35,12 +37,15 @@
   private final BrokerSession session;
   private final Gson gson;
   private final UUID instanceId;
+  private final MessageLogger msgLog;
 
   @Inject
-  public BrokerPublisher(BrokerSession session, Gson gson, @InstanceId UUID instanceId) {
+  public BrokerPublisher(
+      BrokerSession session, Gson gson, @InstanceId UUID instanceId, MessageLogger msgLog) {
     this.session = session;
     this.gson = gson;
     this.instanceId = instanceId;
+    this.msgLog = msgLog;
   }
 
   @Override
@@ -58,11 +63,13 @@
   }
 
   public boolean publishEvent(EventFamily eventType, Event event) {
-    return session.publishEvent(eventType, getPayload(event));
+    SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
+    msgLog.log(Direction.PUBLISH, brokerEvent);
+    return session.publishEvent(eventType, getPayload(brokerEvent));
   }
 
-  private String getPayload(Event event) {
-    return gson.toJson(toBrokerEvent(event));
+  private String getPayload(SourceAwareEventWrapper event) {
+    return gson.toJson(event);
   }
 
   private SourceAwareEventWrapper toBrokerEvent(Event event) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
index ec9ee2c..fa73964 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
@@ -24,6 +24,8 @@
 import com.google.inject.Provider;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.ForwardedEventRouter;
 import java.io.IOException;
@@ -50,6 +52,7 @@
   private final Deserializer<SourceAwareEventWrapper> valueDeserializer;
   private final Configuration configuration;
   private final OneOffRequestContext oneOffCtx;
+  private final MessageLogger msgLog;
 
   public AbstractKafkaSubcriber(
       Configuration configuration,
@@ -59,13 +62,15 @@
       DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx) {
+      OneOffRequestContext oneOffCtx,
+      MessageLogger msgLog) {
     this.configuration = configuration;
     this.eventRouter = eventRouter;
     this.droppedEventListeners = droppedEventListeners;
     this.gsonProvider = gsonProvider;
     this.instanceId = instanceId;
     this.oneOffCtx = oneOffCtx;
+    this.msgLog = msgLog;
     final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
     try {
       Thread.currentThread().setContextClassLoader(AbstractKafkaSubcriber.class.getClassLoader());
@@ -116,7 +121,7 @@
         droppedEventListeners.forEach(l -> l.onEventDropped(event));
       } else {
         try {
-          logger.atInfo().log("Header[%s] Body[%s]", event.getHeader(), event.getBody());
+          msgLog.log(Direction.CONSUME, event);
           eventRouter.route(event.getEventBody(gsonProvider));
         } catch (IOException e) {
           logger.atSevere().withCause(e).log(
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
index 01511be..0e33c00 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/CacheEvictionEventSubscriber.java
@@ -22,6 +22,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
 import java.util.UUID;
@@ -38,7 +39,8 @@
       DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx) {
+      OneOffRequestContext oneOffCtx,
+      MessageLogger msgLog) {
     super(
         configuration,
         keyDeserializer,
@@ -47,7 +49,8 @@
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx);
+        oneOffCtx,
+        msgLog);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
index abef7e6..b61d23d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
@@ -22,6 +22,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.IndexEventRouter;
 import java.util.UUID;
@@ -38,7 +39,8 @@
       DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx) {
+      OneOffRequestContext oneOffCtx,
+      MessageLogger msgLog) {
     super(
         configuration,
         keyDeserializer,
@@ -47,7 +49,8 @@
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx);
+        oneOffCtx,
+        msgLog);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
index 648746e..207848e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
@@ -22,6 +22,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.ProjectListUpdateRouter;
 import java.util.UUID;
@@ -38,7 +39,8 @@
       DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx) {
+      OneOffRequestContext oneOffCtx,
+      MessageLogger msgLog) {
     super(
         configuration,
         keyDeserializer,
@@ -47,7 +49,8 @@
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx);
+        oneOffCtx,
+        msgLog);
   }
 
   @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
index 756af54..c55be53 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
@@ -22,6 +22,7 @@
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.StreamEventRouter;
 import java.util.UUID;
@@ -38,7 +39,8 @@
       DynamicSet<DroppedEventListener> droppedEventListeners,
       Provider<Gson> gsonProvider,
       @InstanceId UUID instanceId,
-      OneOffRequestContext oneOffCtx) {
+      OneOffRequestContext oneOffCtx,
+      MessageLogger msgLog) {
     super(
         configuration,
         keyDeserializer,
@@ -47,7 +49,8 @@
         droppedEventListeners,
         gsonProvider,
         instanceId,
-        oneOffCtx);
+        oneOffCtx,
+        msgLog);
   }
 
   @Override
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index 329774e..fe73065 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -27,6 +27,8 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import com.googlesource.gerrit.plugins.multisite.DisabledMessageLogger;
+import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
@@ -37,11 +39,12 @@
 
 public class BrokerPublisherTest {
   private BrokerPublisher publisher;
+  private MessageLogger NO_MSG_LOG = new DisabledMessageLogger();
   private Gson gson = new GsonProvider().get();
 
   @Before
   public void setUp() {
-    publisher = new BrokerPublisher(new TestBrokerSession(), gson, UUID.randomUUID());
+    publisher = new BrokerPublisher(new TestBrokerSession(), gson, UUID.randomUUID(), NO_MSG_LOG);
   }
 
   @Test