Simplify event publishing
Eliminate Publisher interface as there is only one implementation class
currently exists (MessagePublisher) and use it directly.
Change-Id: I3133777be962121750ecaf515509fe74f7ed64fd
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index e7ddbb6..53c08e5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,32 +14,28 @@
package com.googlesource.gerrit.plugins.kafka;
+import com.google.gerrit.extensions.config.FactoryModule;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.events.EventListener;
import com.google.gson.Gson;
-import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.config.KafkaPropertiesProvider;
import com.googlesource.gerrit.plugins.kafka.message.GsonProvider;
-import com.googlesource.gerrit.plugins.kafka.message.MessagePublisher;
-import com.googlesource.gerrit.plugins.kafka.message.Publisher;
import com.googlesource.gerrit.plugins.kafka.message.PublisherFactory;
import com.googlesource.gerrit.plugins.kafka.session.KafkaSessionFactory;
import com.googlesource.gerrit.plugins.kafka.session.SessionFactoryProvider;
import com.googlesource.gerrit.plugins.kafka.worker.DefaultEventWorker;
-class Module extends AbstractModule {
+class Module extends FactoryModule {
@Override
protected void configure() {
bind(KafkaSessionFactory.class).toProvider(SessionFactoryProvider.class);
- install(new FactoryModuleBuilder().implement(Publisher.class,
- MessagePublisher.class).build(PublisherFactory.class));
+ factory(PublisherFactory.class);
bind(KafkaProperties.class).toProvider(KafkaPropertiesProvider.class)
.in(Singleton.class);
bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/MessagePublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/MessagePublisher.java
deleted file mode 100644
index 23c631c..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/MessagePublisher.java
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.message;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gson.Gson;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-import com.googlesource.gerrit.plugins.kafka.session.Session;
-import com.googlesource.gerrit.plugins.kafka.session.SessionFactoryProvider;
-
-public class MessagePublisher implements Publisher {
-
- private final Session session;
- private final KafkaProperties properties;
- private final Gson gson;
- private boolean available = true;
-
- @Inject
- public MessagePublisher(
- SessionFactoryProvider sessionFactoryProvider,
- Gson gson,
- @Assisted KafkaProperties properties) {
- this.session = sessionFactoryProvider.get().create(properties);
- this.properties = properties;
- this.gson = gson;
- }
-
- @Override
- public void start() {
- if (!session.isOpen()) {
- session.connect();
- available = true;
- }
- }
-
- @Override
- public void stop() {
- session.disconnect();
- available = false;
- }
-
- @Override
- public void onEvent(Event event) {
- if (available && session.isOpen()) {
- session.publish(gson.toJson(event));
- }
- }
-
- @Override
- public void enable() {
- available = true;
- }
-
- @Override
- public void disable() {
- available = false;
- }
-
- @Override
- public boolean isEnabled() {
- return available;
- }
-
- @Override
- public Session getSession() {
- return session;
- }
-
- @Override
- public KafkaProperties getProperties() {
- return properties;
- }
-
- @Override
- public String getName() {
- return "Kafka";
- }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
index 086fa4c..99d2943 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
@@ -14,18 +14,73 @@
package com.googlesource.gerrit.plugins.kafka.message;
+import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventListener;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import com.googlesource.gerrit.plugins.kafka.session.Session;
+import com.googlesource.gerrit.plugins.kafka.session.SessionFactoryProvider;
-public interface Publisher extends EventListener {
- void start();
- void stop();
- void enable();
- void disable();
- boolean isEnabled();
- Session getSession();
- KafkaProperties getProperties();
- String getName();
+public class Publisher implements EventListener {
+
+ private final Session session;
+ private final KafkaProperties properties;
+ private final Gson gson;
+ private boolean available = true;
+
+ @Inject
+ public Publisher(
+ SessionFactoryProvider sessionFactoryProvider,
+ Gson gson,
+ @Assisted KafkaProperties properties) {
+ this.session = sessionFactoryProvider.get().create(properties);
+ this.properties = properties;
+ this.gson = gson;
+ }
+
+ public void start() {
+ if (!session.isOpen()) {
+ session.connect();
+ available = true;
+ }
+ }
+
+ public void stop() {
+ session.disconnect();
+ available = false;
+ }
+
+ @Override
+ public void onEvent(Event event) {
+ if (available && session.isOpen()) {
+ session.publish(gson.toJson(event));
+ }
+ }
+
+ public void enable() {
+ available = true;
+ }
+
+ public void disable() {
+ available = false;
+ }
+
+ public boolean isEnabled() {
+ return available;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public KafkaProperties getProperties() {
+ return properties;
+ }
+
+ public String getName() {
+ return "Kafka";
+ }
}