Leverage BrokerApi interface
Use the external definition of the BrokerApi provided by
the GerritForge's library instead of one defined internally.
Using a well-defined external interface allows to use
other non-Kafka based implementation of the broker.
Feature: Issue 11599
Change-Id: I15d3175f3250698bba3885823fcc9196d640340d
diff --git a/BUILD b/BUILD
index 6736634..99a5344 100644
--- a/BUILD
+++ b/BUILD
@@ -19,6 +19,7 @@
deps = [
"@kafka-client//jar",
"@global-refdb//jar",
+ "@events-broker//jar",
"//plugins/replication",
],
)
@@ -47,5 +48,6 @@
"@testcontainers-kafka//jar",
"//lib/testcontainers",
"@global-refdb//jar",
+ "@events-broker//jar",
],
)
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl
index 15cdb05..6b60626 100644
--- a/external_plugin_deps.bzl
+++ b/external_plugin_deps.bzl
@@ -24,3 +24,9 @@
artifact = "com.gerritforge:global-refdb:0.1.1",
sha1 = "d6ab59906db7b20a52c8994502780b2a6ab23872",
)
+
+ maven_jar(
+ name = "events-broker",
+ artifact = "com.gerritforge:events-broker:3.0.0",
+ sha1 = "8957403a97df5400cf9bd49b3979049dde4b3435",
+ )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
index db7dd63..8e94a51 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Log4jMessageLogger.java
@@ -14,12 +14,12 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.extensions.systemstatus.ServerInformation;
import com.google.gerrit.server.util.PluginLogFile;
import com.google.gerrit.server.util.SystemLog;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import org.apache.log4j.PatternLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
index 8b07115..560d6f6 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/MessageLogger.java
@@ -14,7 +14,7 @@
package com.googlesource.gerrit.plugins.multisite;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
public interface MessageLogger {
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
index f0ad970..045c11e 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/PluginModule.java
@@ -14,13 +14,13 @@
package com.googlesource.gerrit.plugins.multisite;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.extensions.events.ProjectDeletedListener;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
import com.google.inject.Scopes;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerApi;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
import com.googlesource.gerrit.plugins.multisite.validation.ProjectDeletedSharedDbCleanup;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
deleted file mode 100644
index 35350e9..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.broker;
-
-import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import java.util.function.Consumer;
-
-/** API for sending/receiving events through a message Broker. */
-public interface BrokerApi {
-
- /**
- * Send an event to a topic.
- *
- * @param topic
- * @param event
- * @return true if the event was successfully sent. False otherwise.
- */
- boolean send(String topic, Event event);
-
- /**
- * Receive asynchronously events from a topic.
- *
- * @param topic
- * @param eventConsumer
- */
- void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
index 19cf0f7..d7a0429 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiNoOp.java
@@ -14,8 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.server.events.Event;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.function.Consumer;
public class BrokerApiNoOp implements BrokerApi {
@@ -26,5 +27,5 @@
}
@Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
+ public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index e83fe53..9b84629 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -14,10 +14,11 @@
package com.googlesource.gerrit.plugins.multisite.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.function.Consumer;
public class BrokerApiWrapper implements BrokerApi {
@@ -46,7 +47,7 @@
}
@Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
- apiDelegate.get().receiveAync(topic, eventConsumer);
+ public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+ apiDelegate.get().receiveAsync(topic, eventConsumer);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
index 6983984..093b920 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerModule.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
index ba5b532..e1c8bf9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/kafka/BrokerPublisher.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.broker.kafka;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
@@ -26,7 +27,6 @@
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerSession;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.Context;
import java.util.UUID;
import org.slf4j.Logger;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 7854aab..4591fff 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.permissions.PermissionBackendException;
@@ -21,7 +23,6 @@
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
@@ -60,7 +61,7 @@
@Override
public void run() {
- brokerApi.receiveAync(getTopic().topic(), this::processRecord);
+ brokerApi.receiveAsync(getTopic().topic(), this::processRecord);
}
protected abstract EventTopic getTopic();
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 680e8ed..80aee02 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -14,6 +14,8 @@
package com.googlesource.gerrit.plugins.multisite.consumer;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
+
public interface DroppedEventListener {
/**
* Invoked when any event is dropped.
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
deleted file mode 100644
index b8bd0d8..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/SourceAwareEventWrapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-// Copyright (C) 2019 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.multisite.consumer;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import java.util.UUID;
-
-public class SourceAwareEventWrapper {
-
- private final EventHeader header;
- private final JsonObject body;
-
- public EventHeader getHeader() {
- return header;
- }
-
- public JsonObject getBody() {
- return body;
- }
-
- public Event getEventBody(Gson gson) {
- return gson.fromJson(this.body, Event.class);
- }
-
- public static class EventHeader {
- private final UUID eventId;
- private final String eventType;
- private final UUID sourceInstanceId;
- private final Long eventCreatedOn;
-
- public EventHeader(UUID eventId, String eventType, UUID sourceInstanceId, Long eventCreatedOn) {
- this.eventId = eventId;
- this.eventType = eventType;
- this.sourceInstanceId = sourceInstanceId;
- this.eventCreatedOn = eventCreatedOn;
- }
-
- public UUID getEventId() {
- return eventId;
- }
-
- public String getEventType() {
- return eventType;
- }
-
- public UUID getSourceInstanceId() {
- return sourceInstanceId;
- }
-
- public Long getEventCreatedOn() {
- return eventCreatedOn;
- }
-
- public void validate() {
- requireNonNull(eventId, "EventId cannot be null");
- requireNonNull(eventType, "EventType cannot be null");
- requireNonNull(sourceInstanceId, "Source Instance ID cannot be null");
- }
-
- @Override
- public String toString() {
- return "{"
- + "eventId="
- + eventId
- + ", eventType='"
- + eventType
- + '\''
- + ", sourceInstanceId="
- + sourceInstanceId
- + ", eventCreatedOn="
- + eventCreatedOn
- + '}';
- }
- }
-
- public SourceAwareEventWrapper(EventHeader header, JsonObject body) {
- this.header = header;
- this.body = body;
- }
-
- public void validate() {
- requireNonNull(header, "Header cannot be null");
- requireNonNull(body, "Body cannot be null");
- header.validate();
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
index dff21c1..dc64ced 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerCacheEvictionForwarder.java
@@ -14,9 +14,9 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.CacheEvictionEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
index c2cc3dc..348af20 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerIndexEventForwarder.java
@@ -14,8 +14,8 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.inject.Inject;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
index 1a8b652..12606e8 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerProjectListUpdateForwarder.java
@@ -16,9 +16,9 @@
import static com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic.PROJECT_LIST_TOPIC;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.ProjectListUpdateForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ProjectListUpdateEvent;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
index ed3a717..05eeedd 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/forwarder/broker/BrokerStreamEventForwarder.java
@@ -14,10 +14,10 @@
package com.googlesource.gerrit.plugins.multisite.forwarder.broker;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.StreamEventForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
index ff23b78..8dd853b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -14,13 +14,13 @@
package com.googlesource.gerrit.plugins.multisite.kafka;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
import com.google.inject.Provider;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
import java.util.ArrayList;
@@ -47,7 +47,7 @@
}
@Override
- public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+ public void receiveAsync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
KafkaEventSubscriber subscriber = subscriberProvider.get();
synchronized (subscribers) {
subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
index 665ad49..341f45d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerModule.java
@@ -14,6 +14,7 @@
package com.googlesource.gerrit.plugins.multisite.kafka;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.TypeLiteral;
@@ -24,7 +25,6 @@
import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheEvictionForwarder;
import com.googlesource.gerrit.plugins.multisite.forwarder.IndexEventForwarder;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
index 38b8d61..7b3be66 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializer.java
@@ -14,11 +14,11 @@
package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
index c72fa0d..69d7d47 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventSubscriber.java
@@ -15,12 +15,12 @@
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.util.ManualRequestContext;
import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.inject.Inject;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
index 2abc7b0..194e891 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapperTest.java
@@ -5,6 +5,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 9c7b8a9..a8e7b3a 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -19,6 +19,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.gerritforge.gerrit.eventbroker.BrokerApi;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gerrit.acceptance.AbstractDaemonTest;
import com.google.gerrit.acceptance.GerritConfig;
import com.google.gerrit.acceptance.LogThreshold;
@@ -46,12 +48,10 @@
import com.googlesource.gerrit.plugins.multisite.GitModule;
import com.googlesource.gerrit.plugins.multisite.Module;
import com.googlesource.gerrit.plugins.multisite.PluginModule;
-import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApiWrapper;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerModule;
import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberModule;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaBrokerModule;
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
index 239b586..05a67db 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaEventDeserializerTest.java
@@ -17,9 +17,9 @@
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.gerritforge.gerrit.eventbroker.SourceAwareEventWrapper;
import com.google.gson.Gson;
import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.UUID;
import org.junit.Before;
import org.junit.Test;