Leverage BrokerApi interface

Use the external definition of the BrokerApi provided by
the GerritForge's library instead of one defined internally.

Using a well-defined external interface allows to use
other non-Kafka based implementation of the broker.

Feature: Issue 11599
Change-Id: I15d3175f3250698bba3885823fcc9196d640340d
diff --git a/BUILD b/BUILD
index 6736634..99a5344 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,7 @@
     deps = [
         "@kafka-client//jar",
         "@global-refdb//jar",
+        "@events-broker//jar",
         "//plugins/replication",
     ],
 )
@@ -47,5 +48,6 @@
         "@testcontainers-kafka//jar",
         "//lib/testcontainers",
         "@global-refdb//jar",
+        "@events-broker//jar",
     ],
 )
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 15cdb05..6b60626 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -24,3 +24,9 @@
         artifact = "com.gerritforge:global-refdb:0.1.1",
         sha1 = "d6ab59906db7b20a52c8994502780b2a6ab23872",
     )
+
+    maven_jar(
+        name = "events-broker",
+        artifact = "com.gerritforge:events-broker:3.0.0",
+        sha1 = "8957403a97df5400cf9bd49b3979049dde4b3435",
+    )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index db7dd63..8e94a51 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,12 +14,12 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.extensions.systemstatus.ServerInformation;
 import com.google.gerrit.server.util.PluginLogFile;
 import com.google.gerrit.server.util.SystemLog;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import org.apache.log4j.PatternLayout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index 8b07115..560d6f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 
 public interface MessageLogger {
 
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index f0ad970..045c11e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -14,13 +14,13 @@
 
 package com.googlesource.gerrit.plugins.multisite;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.events.ProjectDeletedListener;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.Inject;
 import com.google.inject.Scopes;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
 import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
deleted file mode 100644
index 35350e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.function.Consumer;
-
-/** API for sending/receiving events through a message Broker. */
-public interface BrokerApi {
-
-  /**
-   * Send an event to a topic.
-   *
-   * @param topic
-   * @param event
-   * @return true if the event was successfully sent. False otherwise.
-   */
-  boolean send(String topic, Event event);
-
-  /**
-   * Receive asynchronously events from a topic.
-   *
-   * @param topic
-   * @param eventConsumer
-   */
-  void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
index 19cf0f7..d7a0429 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -14,8 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.function.Consumer;
 
 public class BrokerApiNoOp implements BrokerApi {
@@ -26,5 +27,5 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+  public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index e83fe53..9b84629 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -14,10 +14,11 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.function.Consumer;
 
 public class BrokerApiWrapper implements BrokerApi {
@@ -46,7 +47,7 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
-    apiDelegate.get().receiveAync(topic, eventConsumer);
+  public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+    apiDelegate.get().receiveAsync(topic, eventConsumer);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index 6983984..093b920 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
index ba5b532..e1c8bf9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.broker.kafka;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
@@ -26,7 +27,6 @@
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
 import java.util.UUID;
 import org.slf4j.Logger;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7854aab..4591fff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -21,7 +23,6 @@
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger;
 import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
@@ -60,7 +61,7 @@
 
   @Override
   public void run() {
-    brokerApi.receiveAync(getTopic().topic(), this::processRecord);
+    brokerApi.receiveAsync(getTopic().topic(), this::processRecord);
   }
 
   protected abstract EventTopic getTopic();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 680e8ed..80aee02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,6 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.consumer;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
+
 public interface DroppedEventListener {
   /**
    * Invoked when any event is dropped.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
deleted file mode 100644
index b8bd0d8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.UUID;
-
-public class SourceAwareEventWrapper {
-
-  private final EventHeader header;
-  private final JsonObject body;
-
-  public EventHeader getHeader() {
-    return header;
-  }
-
-  public JsonObject getBody() {
-    return body;
-  }
-
-  public Event getEventBody(Gson gson) {
-    return gson.fromJson(this.body, Event.class);
-  }
-
-  public static class EventHeader {
-    private final UUID eventId;
-    private final String eventType;
-    private final UUID sourceInstanceId;
-    private final Long eventCreatedOn;
-
-    public EventHeader(UUID eventId, String eventType, UUID sourceInstanceId, Long eventCreatedOn) {
-      this.eventId = eventId;
-      this.eventType = eventType;
-      this.sourceInstanceId = sourceInstanceId;
-      this.eventCreatedOn = eventCreatedOn;
-    }
-
-    public UUID getEventId() {
-      return eventId;
-    }
-
-    public String getEventType() {
-      return eventType;
-    }
-
-    public UUID getSourceInstanceId() {
-      return sourceInstanceId;
-    }
-
-    public Long getEventCreatedOn() {
-      return eventCreatedOn;
-    }
-
-    public void validate() {
-      requireNonNull(eventId, "EventId cannot be null");
-      requireNonNull(eventType, "EventType cannot be null");
-      requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
-    }
-
-    @Override
-    public String toString() {
-      return "{"
-          + "eventId="
-          + eventId
-          + ", eventType='"
-          + eventType
-          + '\''
-          + ", sourceInstanceId="
-          + sourceInstanceId
-          + ", eventCreatedOn="
-          + eventCreatedOn
-          + '}';
-    }
-  }
-
-  public SourceAwareEventWrapper(EventHeader header, JsonObject body) {
-    this.header = header;
-    this.body = body;
-  }
-
-  public void validate() {
-    requireNonNull(header, "Header cannot be null");
-    requireNonNull(body, "Body cannot be null");
-    header.validate();
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index dff21c1..dc64ced 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -14,9 +14,9 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..348af20 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -14,8 +14,8 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 1a8b652..12606e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -16,9 +16,9 @@
 
 import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic.PROJECT_LIST_TOPIC;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index ed3a717..05eeedd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -14,10 +14,10 @@
 
 package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
index ff23b78..8dd853b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -14,13 +14,13 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.server.events.Event;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
 import java.util.ArrayList;
@@ -47,7 +47,7 @@
   }
 
   @Override
-  public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+  public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
     KafkaEventSubscriber subscriber = subscriberProvider.get();
     synchronized (subscribers) {
       subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 665ad49..341f45d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -14,6 +14,7 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.inject.TypeLiteral;
@@ -24,7 +25,6 @@
 import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
 import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
 import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
index 38b8d61..7b3be66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
@@ -14,11 +14,11 @@
 
 package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.Map;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
index c72fa0d..69d7d47 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
@@ -15,12 +15,12 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.common.flogger.FluentLogger;
 import com.google.gerrit.server.util.ManualRequestContext;
 import com.google.gerrit.server.util.OneOffRequestContext;
 import com.google.inject.Inject;
 import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 2abc7b0..194e891 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -5,6 +5,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
 import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.server.events.Event;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 9c7b8a9..a8e7b3a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -19,6 +19,8 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gerrit.acceptance.AbstractDaemonTest;
 import com.google.gerrit.acceptance.GerritConfig;
 import com.google.gerrit.acceptance.LogThreshold;
@@ -46,12 +48,10 @@
 import com.googlesource.gerrit.plugins.multisite.GitModule;
 import com.googlesource.gerrit.plugins.multisite.Module;
 import com.googlesource.gerrit.plugins.multisite.PluginModule;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
 import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
 import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
 import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
 import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
index 239b586..05a67db 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
@@ -17,9 +17,9 @@
 import static com.google.common.truth.Truth.assertThat;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
 import com.google.gson.Gson;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
 import java.util.UUID;
 import org.junit.Before;
 import org.junit.Test;