Handle single publisher instance

Just instantiate a single publisher instance directly
without the need of factory indirections for session and publisher

Change-Id: I7fb21a2185451531634a282ac0fc1705645ece19
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index c44e6b5..84eb600 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -14,42 +14,28 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
-import java.util.ArrayList;
-import java.util.List;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.message.Publisher;
-import com.googlesource.gerrit.plugins.kafka.message.PublisherFactory;
 
 @Singleton
 public class Manager implements LifecycleListener {
 
-  private final PublisherFactory publisherFactory;
-  private final KafkaProperties properties;
-  private final List<Publisher> publisherList = new ArrayList<>();
+  private final Publisher publisher;
 
   @Inject
-  public Manager(
-      PublisherFactory publisherFactory,
-      KafkaProperties properties) {
-    this.publisherFactory = publisherFactory;
-    this.properties = properties;
+  public Manager(Publisher publisher) {
+    this.publisher = publisher;
   }
 
   @Override
   public void start() {
-    Publisher publisher = publisherFactory.create(properties);
     publisher.start();
-    publisherList.add(publisher);
   }
 
   @Override
   public void stop() {
-    for (Publisher publisher : publisherList) {
-      publisher.stop();
-    }
-    publisherList.clear();
+    publisher.stop();
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 21c7b15..7cf75fb 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,30 +14,20 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
-import com.google.gerrit.extensions.config.FactoryModule;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
+import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
-
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-import com.googlesource.gerrit.plugins.kafka.config.KafkaPropertiesProvider;
 import com.googlesource.gerrit.plugins.kafka.message.GsonProvider;
 import com.googlesource.gerrit.plugins.kafka.message.Publisher;
-import com.googlesource.gerrit.plugins.kafka.message.PublisherFactory;
-import com.googlesource.gerrit.plugins.kafka.session.KafkaSessionFactory;
 
-class Module extends FactoryModule {
+class Module extends AbstractModule {
 
   @Override
   protected void configure() {
-    factory(KafkaSessionFactory.class);
-    factory(PublisherFactory.class);
-    bind(KafkaProperties.class).toProvider(KafkaPropertiesProvider.class)
-        .in(Singleton.class);
     bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
-
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
     DynamicSet.bind(binder(), EventListener.class).to(Publisher.class);
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index d519e03..24d9438 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -15,19 +15,23 @@
 package com.googlesource.gerrit.plugins.kafka.config;
 
 import com.google.common.base.CaseFormat;
+import com.google.gerrit.extensions.annotations.PluginName;
 import com.google.gerrit.server.config.PluginConfig;
+import com.google.gerrit.server.config.PluginConfigFactory;
+import com.google.inject.Inject;
 
 public class KafkaProperties extends java.util.Properties {
   private static final long serialVersionUID = 0L;
 
   private final String topic;
 
-  public KafkaProperties(PluginConfig config) {
+  @Inject
+  public KafkaProperties(PluginConfigFactory configFactory, @PluginName String pluginName) {
     super();
     setDefaults();
-    applyConfig(config);
-
-    topic = config.getString("topic", "gerrit");
+    PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
+    topic = fromGerritConfig.getString("topic", "gerrit");
+    applyConfig(fromGerritConfig);
   }
 
   private void setDefaults() {
@@ -36,18 +40,15 @@
     put("batch.size", 16384);
     put("linger.ms", 1);
     put("buffer.memory", 33554432);
-    put("key.serializer",
-        "org.apache.kafka.common.serialization.StringSerializer");
-    put("value.serializer",
-        "org.apache.kafka.common.serialization.StringSerializer");
+    put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+    put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   }
 
   private void applyConfig(PluginConfig config) {
     for (String name : config.getNames()) {
       Object value = config.getString(name);
       String propName =
-          CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, name).replaceAll(
-              "-", ".");
+          CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_HYPHEN, name).replaceAll("-", ".");
       put(propName, value);
     }
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPropertiesProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPropertiesProvider.java
deleted file mode 100644
index a596dde..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaPropertiesProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.config;
-
-import com.google.gerrit.extensions.annotations.PluginName;
-import com.google.gerrit.server.config.PluginConfigFactory;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-
-
-public class KafkaPropertiesProvider implements Provider<KafkaProperties> {
-
-  private final PluginConfigFactory configFactory;
-  private final String pluginName;
-
-  @Inject
-  public KafkaPropertiesProvider(PluginConfigFactory configFactory,
-      @PluginName String pluginName) {
-    this.configFactory = configFactory;
-    this.pluginName = pluginName;
-  }
-
-  @Override
-  public KafkaProperties get() {
-    return new KafkaProperties(configFactory.getFromGerritConfig(pluginName));
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
index 8372f15..25de2dc 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/Publisher.java
@@ -18,26 +18,19 @@
 import com.google.gerrit.server.events.EventListener;
 import com.google.gson.Gson;
 import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
+import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSession;
-import com.googlesource.gerrit.plugins.kafka.session.KafkaSessionFactory;
 
+@Singleton
 public class Publisher implements EventListener {
 
   private final KafkaSession session;
-  private final KafkaProperties properties;
   private final Gson gson;
   private boolean available = true;
 
   @Inject
-  public Publisher(
-      KafkaSessionFactory kafkaSessionFactory,
-      Gson gson,
-      @Assisted KafkaProperties properties) {
-    this.session = kafkaSessionFactory.create(properties);
-    this.properties = properties;
+  public Publisher(KafkaSession kafkaSession, Gson gson) {
+    this.session = kafkaSession;
     this.gson = gson;
   }
 
@@ -76,10 +69,6 @@
     return session;
   }
 
-  public KafkaProperties getProperties() {
-    return properties;
-  }
-
   public String getName() {
     return "Kafka";
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/PublisherFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/message/PublisherFactory.java
deleted file mode 100644
index f365416..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/message/PublisherFactory.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.message;
-
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-
-public interface PublisherFactory {
-  Publisher create(KafkaProperties properties);
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
index 2abfffa..ae8aa2b 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -15,7 +15,6 @@
 package com.googlesource.gerrit.plugins.kafka.session;
 
 import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -25,13 +24,12 @@
 
 public final class KafkaSession {
 
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(KafkaSession.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSession.class);
   private final KafkaProperties properties;
   private volatile Producer<String, String> producer;
 
   @Inject
-  public KafkaSession(@Assisted KafkaProperties properties) {
+  public KafkaSession(KafkaProperties properties) {
     this.properties = properties;
   }
 
@@ -48,16 +46,14 @@
       return;
     }
 
-    LOGGER.info("Connect to {}...",
-        properties.getProperty("bootstrap.servers"));
+    LOGGER.info("Connect to {}...", properties.getProperty("bootstrap.servers"));
     setConnectionClassLoader();
     producer = new KafkaProducer<>(properties);
     LOGGER.info("Connection established.");
   }
 
   private void setConnectionClassLoader() {
-    Thread.currentThread().setContextClassLoader(
-        KafkaSession.class.getClassLoader());
+    Thread.currentThread().setContextClassLoader(KafkaSession.class.getClassLoader());
   }
 
   public void disconnect() {
@@ -70,7 +66,6 @@
   }
 
   public void publish(String messageBody) {
-    producer.send(new ProducerRecord<>(properties.getTopic(), "" + System.nanoTime(),
-        messageBody));
+    producer.send(new ProducerRecord<>(properties.getTopic(), "" + System.nanoTime(), messageBody));
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSessionFactory.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSessionFactory.java
deleted file mode 100644
index c002b5c..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSessionFactory.java
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.session;
-
-import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
-
-public interface KafkaSessionFactory {
-  KafkaSession create(KafkaProperties properties);
-}