Introduce the abstract BrokerApi

Define a generic interface to communicate with an external
message broker.

Implement the default KafkaBrokerApi reusing the existing class
hierarchy, so that the impact and risk introduced by this change
is reduced to a minimum.

Start using the BrokerApi in the CacheEvictionForwarder.

Change-Id: I1207f174c96c48cbd7c0e309ef32270a6fbf364e
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index ad752d7..55a7168 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -22,8 +22,10 @@
 import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
+import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.spi.Message;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
@@ -31,6 +33,7 @@
 import com.googlesource.gerrit.plugins.multisite.event.EventModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ForwarderModule;
 import com.googlesource.gerrit.plugins.multisite.index.IndexModule;
+import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
 import com.googlesource.gerrit.plugins.multisite.kafka.router.KafkaForwardedEventRouterModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ValidationModule;
 import java.io.BufferedReader;
@@ -115,6 +118,8 @@
       install(new IndexModule());
     }
 
+    bind(BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+
     install(kafkaForwardedEventRouterModule);
 
     install(kafkaBrokerForwarderModule);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
new file mode 100644
index 0000000..d6e6c4c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
@@ -0,0 +1,39 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.broker;
+
+import com.google.gerrit.server.events.Event;
+import java.util.function.Consumer;
+
+/** API for sending/receiving events through a message Broker. */
+public interface BrokerApi {
+
+  /**
+   * Send an event to a topic.
+   *
+   * @param topic
+   * @param event
+   * @return true if the event was successfully sent. False otherwise.
+   */
+  boolean send(String topic, Event event);
+
+  /**
+   * Receive asynchronously events from a topic.
+   *
+   * @param topic
+   * @param eventConsumer
+   */
+  void receiveAync(String topic, Consumer<Event> eventConsumer);
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
index 09f24a1..a74a196 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerPublisher.java
@@ -25,7 +25,6 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
 import org.slf4j.Logger;
@@ -69,13 +68,13 @@
     }
   }
 
-  public boolean publishEvent(EventFamily eventType, Event event) {
+  public boolean publish(String topic, Event event) {
     if (Context.isForwardedEvent()) {
       return true;
     }
 
     SourceAwareEventWrapper brokerEvent = toBrokerEvent(event);
-    Boolean eventPublished = session.publishEvent(eventType, getPayload(brokerEvent));
+    Boolean eventPublished = session.publish(topic, getPayload(brokerEvent));
     if (eventPublished) {
       msgLog.log(Direction.PUBLISH, brokerEvent);
       brokerMetrics.incrementBrokerPublishedMessage();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
index 291071c..b04fbff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerSession.java
@@ -14,8 +14,6 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
-import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
-
 public interface BrokerSession {
 
   boolean isOpen();
@@ -24,5 +22,5 @@
 
   void disconnect();
 
-  boolean publishEvent(EventFamily eventFamily, String payload);
+  boolean publish(String topic, String payload);
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
index aaf329a..802abc1 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/KafkaSession.java
@@ -81,8 +81,8 @@
   }
 
   @Override
-  public boolean publishEvent(EventFamily eventType, String payload) {
-    return publishToTopic(properties.getKafka().getTopic(eventType), payload);
+  public boolean publish(String topic, String payload) {
+    return publishToTopic(properties.getKafka().getTopic(EventFamily.fromTopic(topic)), payload);
   }
 
   private boolean publishToTopic(String topic, String payload) {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index 7891336..e354b87 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -16,22 +16,22 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventFamily;
 
 @Singleton
 public class BrokerCacheEvictionForwarder implements CacheEvictionForwarder {
-  private final BrokerPublisher publisher;
+  private final BrokerApi broker;
 
   @Inject
-  BrokerCacheEvictionForwarder(BrokerPublisher publisher) {
-    this.publisher = publisher;
+  BrokerCacheEvictionForwarder(BrokerApi broker) {
+    this.broker = broker;
   }
 
   @Override
   public boolean evict(CacheEvictionEvent event) {
-    return publisher.publishEvent(EventFamily.CACHE_EVENT, event);
+    return broker.send(EventFamily.CACHE_EVENT.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index bf70c84..3812700 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -30,6 +30,6 @@
 
   @Override
   public boolean index(IndexEvent event) {
-    return publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    return publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index cb05a4e..ee8b1ae 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -33,6 +33,6 @@
 
   @Override
   public boolean updateProjectList(ProjectListUpdateEvent event) {
-    return publisher.publishEvent(PROJECT_LIST_EVENT, event);
+    return publisher.publish(PROJECT_LIST_EVENT.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index 63db260..b259bbe 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -32,6 +32,6 @@
 
   @Override
   public boolean send(Event event) {
-    return publisher.publishEvent(EventFamily.STREAM_EVENT, event);
+    return publisher.publish(EventFamily.STREAM_EVENT.topic(), event);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
index e8d45ed..b59f8c9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/events/EventFamily.java
@@ -17,12 +17,32 @@
 import com.google.common.base.CaseFormat;
 
 public enum EventFamily {
-  INDEX_EVENT,
-  CACHE_EVENT,
-  PROJECT_LIST_EVENT,
-  STREAM_EVENT;
+  INDEX_EVENT("GERRIT.EVENT.INDEX"),
+  CACHE_EVENT("GERRIT.EVENT.CACHE"),
+  PROJECT_LIST_EVENT("GERRIT.EVENT.PROJECT.LIST"),
+  STREAM_EVENT("GERRIT.EVENT.STREAM");
+
+  private final String topic;
+
+  private EventFamily(String topic) {
+    this.topic = topic;
+  }
 
   public String lowerCamelName() {
     return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name());
   }
+
+  public String topic() {
+    return topic;
+  }
+
+  public static EventFamily fromTopic(String topic) {
+    EventFamily[] eventFamilies = EventFamily.values();
+    for (EventFamily eventFamily : eventFamilies) {
+      if (eventFamily.topic.equals(topic)) {
+        return eventFamily;
+      }
+    }
+    return null;
+  }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
new file mode 100644
index 0000000..ac927ab
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -0,0 +1,42 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite.kafka;
+
+import com.google.gerrit.extensions.restapi.NotImplementedException;
+import com.google.gerrit.server.events.Event;
+import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerPublisher;
+import java.util.function.Consumer;
+
+public class KafkaBrokerApi implements BrokerApi {
+
+  private final BrokerPublisher publisher;
+
+  @Inject
+  public KafkaBrokerApi(BrokerPublisher publisher) {
+    this.publisher = publisher;
+  }
+
+  @Override
+  public boolean send(String topic, Event event) {
+    return publisher.publish(topic, event);
+  }
+
+  @Override
+  public void receiveAync(String topic, Consumer<Event> eventConsumer) {
+    throw new NotImplementedException();
+  }
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
index d8f55a1..d1a18c8 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisherTest.java
@@ -128,35 +128,34 @@
   @Test
   public void shouldIncrementBrokerMetricCounterWhenMessagePublished() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(true);
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    when(session.publish(any(), any())).thenReturn(true);
+    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
     verify(brokerMetrics, only()).incrementBrokerPublishedMessage();
   }
 
   @Test
   public void shouldIncrementBrokerFailedMetricCounterWhenMessagePublished() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(false);
+    when(session.publish(any(), any())).thenReturn(false);
 
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
     verify(brokerMetrics, only()).incrementBrokerFailedToPublishMessage();
   }
 
   @Test
   public void shouldLogEventPublishedMessageWhenPublishingSucceed() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(true);
-
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    when(session.publish(any(), any())).thenReturn(true);
+    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
     verify(msgLog, only()).log(any(), any());
   }
 
   @Test
   public void shouldSkipEventPublishedLoggingWhenMessagePublishigFailed() {
     Event event = createSampleEvent();
-    when(session.publishEvent(any(), any())).thenReturn(false);
+    when(session.publish(any(), any())).thenReturn(false);
 
-    publisher.publishEvent(EventFamily.INDEX_EVENT, event);
+    publisher.publish(EventFamily.INDEX_EVENT.topic(), event);
     verify(msgLog, never()).log(any(), any());
   }