Use BrokerApi for events consuming
Leverage the new BrokerApi for consuming event through
a generic message broker.
Get rid of the references to the KafkaEventSubscriber class.
Feature: Issue 10829
Change-Id: I36c40de5b03363f06315b89380c4918e0a1ae7a2
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 2e0fb1b..15abfac 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -122,6 +122,7 @@
install(new BrokerModule());
DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
+ listener().to(KafkaBrokerApi.class);
install(kafkaForwardedEventRouterModule);
install(kafkaBrokerForwarderModule);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
index d6e6c4c..35350e9 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApi.java
@@ -15,6 +15,7 @@
package com.googlesource.gerrit.plugins.multisite.broker;
import com.google.gerrit.server.events.Event;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.function.Consumer;
/** API for sending/receiving events through a message Broker. */
@@ -35,5 +36,5 @@
* @param topic
* @param eventConsumer
*/
- void receiveAync(String topic, Consumer<Event> eventConsumer);
+ void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer);
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
index 6dc9577..e83fe53 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/broker/BrokerApiWrapper.java
@@ -17,6 +17,7 @@
import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import java.util.function.Consumer;
public class BrokerApiWrapper implements BrokerApi {
@@ -45,7 +46,7 @@
}
@Override
- public void receiveAync(String topic, Consumer<Event> eventConsumer) {
+ public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
apiDelegate.get().receiveAync(topic, eventConsumer);
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
similarity index 84%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
index 569a864..6e70299 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/AbstractKafkaSubcriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/AbstractSubcriber.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.registration.DynamicSet;
@@ -22,19 +22,19 @@
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
import com.googlesource.gerrit.plugins.multisite.MessageLogger.Direction;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.CacheNotFoundException;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ForwardedEventRouter;
+
import java.io.IOException;
import java.util.UUID;
-public abstract class AbstractKafkaSubcriber implements Runnable {
+public abstract class AbstractSubcriber implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final KafkaEventSubscriber subscriber;
+ private final BrokerApi brokerApi;
private final ForwardedEventRouter eventRouter;
private final DynamicSet<DroppedEventListener> droppedEventListeners;
private final Gson gson;
@@ -42,8 +42,8 @@
private final MessageLogger msgLog;
private SubscriberMetrics subscriberMetrics;
- public AbstractKafkaSubcriber(
- KafkaEventSubscriber subscriber,
+ public AbstractSubcriber(
+ BrokerApi brokerApi,
ForwardedEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@@ -56,12 +56,12 @@
this.instanceId = instanceId;
this.msgLog = msgLog;
this.subscriberMetrics = subscriberMetrics;
- this.subscriber = subscriber;
+ this.brokerApi = brokerApi;
}
@Override
public void run() {
- subscriber.subscribe(getTopic(), this::processRecord);
+ brokerApi.receiveAync(getTopic().topic(), this::processRecord);
}
protected abstract EventTopic getTopic();
@@ -89,9 +89,4 @@
}
}
}
-
- // Shutdown hook which can be called from a separate thread
- public void shutdown() {
- subscriber.shutdown();
- }
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
similarity index 83%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
index 26725d8..fc62e6d 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaCacheEvictionEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/CacheEvictionEventSubscriber.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gson.Gson;
@@ -20,17 +20,17 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import java.util.UUID;
@Singleton
-public class KafkaCacheEvictionEventSubscriber extends AbstractKafkaSubcriber {
+public class CacheEvictionEventSubscriber extends AbstractSubcriber {
@Inject
- public KafkaCacheEvictionEventSubscriber(
- KafkaEventSubscriber subscriber,
+ public CacheEvictionEventSubscriber(
+ BrokerApi brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
@@ -38,7 +38,7 @@
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- subscriber,
+ brokerApi,
eventRouter,
droppedEventListeners,
gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
similarity index 88%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
index 10422a7..936d07a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ConsumerExecutor.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ConsumerExecutor.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
@@ -21,4 +21,4 @@
@Retention(RUNTIME)
@BindingAnnotation
-@interface ConsumerExecutor {}
+public @interface ConsumerExecutor {}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
similarity index 84%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
index 7de8487..680e8ed 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/DroppedEventListener.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/DroppedEventListener.java
@@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
-
-import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+package com.googlesource.gerrit.plugins.multisite.consumer;
public interface DroppedEventListener {
/**
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
similarity index 86%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
index 28d068e..c201f65 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/IndexEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/IndexEventSubscriber.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gson.Gson;
@@ -20,17 +20,17 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.IndexEventRouter;
import java.util.UUID;
@Singleton
-public class IndexEventSubscriber extends AbstractKafkaSubcriber {
+public class IndexEventSubscriber extends AbstractSubcriber {
@Inject
public IndexEventSubscriber(
- KafkaEventSubscriber subscriber,
+ BrokerApi brokerApi,
IndexEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gsonProvider,
@@ -38,7 +38,7 @@
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- subscriber,
+ brokerApi,
eventRouter,
droppedEventListeners,
gsonProvider,
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
similarity index 73%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
index 5229e54..ddaa3d4 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/MultiSiteKafkaConsumerRunner.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/MultiSiteConsumerRunner.java
@@ -12,25 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import java.util.concurrent.Executor;
+
+import java.util.concurrent.ExecutorService;
@Singleton
-public class MultiSiteKafkaConsumerRunner implements LifecycleListener {
+public class MultiSiteConsumerRunner implements LifecycleListener {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private final DynamicSet<AbstractKafkaSubcriber> consumers;
- private final Executor executor;
+ private final DynamicSet<AbstractSubcriber> consumers;
+ private final ExecutorService executor;
@Inject
- public MultiSiteKafkaConsumerRunner(
- @ConsumerExecutor Executor executor, DynamicSet<AbstractKafkaSubcriber> consumers) {
+ public MultiSiteConsumerRunner(
+ @ConsumerExecutor ExecutorService executor, DynamicSet<AbstractSubcriber> consumers) {
this.consumers = consumers;
this.executor = executor;
}
@@ -44,6 +45,6 @@
@Override
public void stop() {
logger.atInfo().log("shutting down consumers");
- this.consumers.forEach(c -> c.shutdown());
+ executor.shutdown();
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
similarity index 79%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
index e47a515..4157609 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/ProjectUpdateEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/ProjectUpdateEventSubscriber.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gson.Gson;
@@ -20,17 +20,17 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.ProjectListUpdateRouter;
import java.util.UUID;
@Singleton
-public class ProjectUpdateEventSubscriber extends AbstractKafkaSubcriber {
+public class ProjectUpdateEventSubscriber extends AbstractSubcriber {
@Inject
public ProjectUpdateEventSubscriber(
- KafkaEventSubscriber subscriber,
+ BrokerApi brokerApi,
ProjectListUpdateRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@@ -38,13 +38,7 @@
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- subscriber,
- eventRouter,
- droppedEventListeners,
- gson,
- instanceId,
- msgLog,
- subscriberMetrics);
+ brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
similarity index 79%
rename from src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
rename to src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
index f07d969..af0f3e7 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/StreamEventSubscriber.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/consumer/StreamEventSubscriber.java
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.googlesource.gerrit.plugins.multisite.kafka.consumer;
+package com.googlesource.gerrit.plugins.multisite.consumer;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gson.Gson;
@@ -20,17 +20,17 @@
import com.google.inject.Singleton;
import com.googlesource.gerrit.plugins.multisite.InstanceId;
import com.googlesource.gerrit.plugins.multisite.MessageLogger;
+import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
-import com.googlesource.gerrit.plugins.multisite.consumer.SubscriberMetrics;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.router.StreamEventRouter;
import java.util.UUID;
@Singleton
-public class StreamEventSubscriber extends AbstractKafkaSubcriber {
+public class StreamEventSubscriber extends AbstractSubcriber {
@Inject
public StreamEventSubscriber(
- KafkaEventSubscriber subscriber,
+ BrokerApi brokerApi,
StreamEventRouter eventRouter,
DynamicSet<DroppedEventListener> droppedEventListeners,
@BrokerGson Gson gson,
@@ -38,13 +38,7 @@
MessageLogger msgLog,
SubscriberMetrics subscriberMetrics) {
super(
- subscriber,
- eventRouter,
- droppedEventListeners,
- gson,
- instanceId,
- msgLog,
- subscriberMetrics);
+ brokerApi, eventRouter, droppedEventListeners, gson, instanceId, msgLog, subscriberMetrics);
}
@Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
index d9b5d10..ff23b78 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/KafkaBrokerApi.java
@@ -14,20 +14,31 @@
package com.googlesource.gerrit.plugins.multisite.kafka;
-import com.google.gerrit.extensions.restapi.NotImplementedException;
+import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.events.Event;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerApi;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.BrokerPublisher;
+import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
+import com.googlesource.gerrit.plugins.multisite.kafka.consumer.KafkaEventSubscriber;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Consumer;
-public class KafkaBrokerApi implements BrokerApi {
+public class KafkaBrokerApi implements BrokerApi, LifecycleListener {
private final BrokerPublisher publisher;
+ private final Provider<KafkaEventSubscriber> subscriberProvider;
+ private List<KafkaEventSubscriber> subscribers;
@Inject
- public KafkaBrokerApi(BrokerPublisher publisher) {
+ public KafkaBrokerApi(
+ BrokerPublisher publisher, Provider<KafkaEventSubscriber> subscriberProvider) {
this.publisher = publisher;
+ this.subscriberProvider = subscriberProvider;
+ subscribers = new ArrayList<>();
}
@Override
@@ -36,7 +47,21 @@
}
@Override
- public void receiveAync(String topic, Consumer<Event> eventConsumer) {
- throw new NotImplementedException();
+ public void receiveAync(String topic, Consumer<SourceAwareEventWrapper> eventConsumer) {
+ KafkaEventSubscriber subscriber = subscriberProvider.get();
+ synchronized (subscribers) {
+ subscribers.add(subscriber);
+ }
+ subscriber.subscribe(EventTopic.of(topic), eventConsumer);
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {
+ for (KafkaEventSubscriber subscriber : subscribers) {
+ subscriber.shutdown();
+ }
}
}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
index b35231e..911361c 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/KafkaConsumerModule.java
@@ -18,11 +18,19 @@
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.inject.Inject;
import com.google.inject.TypeLiteral;
+import com.googlesource.gerrit.plugins.multisite.consumer.AbstractSubcriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.CacheEvictionEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.ConsumerExecutor;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
+import com.googlesource.gerrit.plugins.multisite.consumer.IndexEventSubscriber;
+import com.googlesource.gerrit.plugins.multisite.consumer.MultiSiteConsumerRunner;
+import com.googlesource.gerrit.plugins.multisite.consumer.ProjectUpdateEventSubscriber;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
+import com.googlesource.gerrit.plugins.multisite.consumer.StreamEventSubscriber;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.EventTopic;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.MultiSiteEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -43,26 +51,24 @@
bind(new TypeLiteral<Deserializer<SourceAwareEventWrapper>>() {})
.to(KafkaEventDeserializer.class);
- bind(Executor.class)
+ bind(ExecutorService.class)
.annotatedWith(ConsumerExecutor.class)
.toInstance(Executors.newFixedThreadPool(EventTopic.values().length));
- listener().to(MultiSiteKafkaConsumerRunner.class);
+ listener().to(MultiSiteConsumerRunner.class);
- DynamicSet.setOf(binder(), AbstractKafkaSubcriber.class);
+ DynamicSet.setOf(binder(), AbstractSubcriber.class);
if (config.kafkaSubscriber().enabledEvent(EventTopic.INDEX_TOPIC)) {
- DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(IndexEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(IndexEventSubscriber.class);
}
if (config.kafkaSubscriber().enabledEvent(EventTopic.STREAM_EVENT_TOPIC)) {
- DynamicSet.bind(binder(), AbstractKafkaSubcriber.class).to(StreamEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(StreamEventSubscriber.class);
}
if (config.kafkaSubscriber().enabledEvent(EventTopic.CACHE_TOPIC)) {
- DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
- .to(KafkaCacheEvictionEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(CacheEvictionEventSubscriber.class);
}
if (config.kafkaSubscriber().enabledEvent(EventTopic.PROJECT_LIST_TOPIC)) {
- DynamicSet.bind(binder(), AbstractKafkaSubcriber.class)
- .to(ProjectUpdateEventSubscriber.class);
+ DynamicSet.bind(binder(), AbstractSubcriber.class).to(ProjectUpdateEventSubscriber.class);
}
DynamicSet.setOf(binder(), DroppedEventListener.class);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index 4606fdb..b69f9e6 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -42,6 +42,7 @@
import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
import com.googlesource.gerrit.plugins.multisite.broker.BrokerGson;
import com.googlesource.gerrit.plugins.multisite.broker.kafka.KafkaBrokerForwarderModule;
+import com.googlesource.gerrit.plugins.multisite.consumer.DroppedEventListener;
import com.googlesource.gerrit.plugins.multisite.consumer.SourceAwareEventWrapper;
import com.googlesource.gerrit.plugins.multisite.forwarder.events.ChangeIndexEvent;
import com.googlesource.gerrit.plugins.multisite.kafka.KafkaConfiguration;