Remove event worker abstraction

Remove event worker abstraction that only purpose is to delegate event
processing to publisher and bind publisher instance itself as event
listener.

Change-Id: Ic530a9b0d271ae230c5edb3a684b0b899091cb5d
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
index bdbe81b..c44e6b5 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Manager.java
@@ -14,32 +14,26 @@
 
 package com.googlesource.gerrit.plugins.kafka;
 
+import java.util.ArrayList;
+import java.util.List;
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.message.Publisher;
 import com.googlesource.gerrit.plugins.kafka.message.PublisherFactory;
-import com.googlesource.gerrit.plugins.kafka.worker.DefaultEventWorker;
-import com.googlesource.gerrit.plugins.kafka.worker.EventWorker;
-
-import java.util.ArrayList;
-import java.util.List;
 
 @Singleton
 public class Manager implements LifecycleListener {
 
-  private final EventWorker defaultEventWorker;
   private final PublisherFactory publisherFactory;
   private final KafkaProperties properties;
   private final List<Publisher> publisherList = new ArrayList<>();
 
   @Inject
-  public Manager(DefaultEventWorker defaultEventWorker,
+  public Manager(
       PublisherFactory publisherFactory,
       KafkaProperties properties) {
-    this.defaultEventWorker = defaultEventWorker;
     this.publisherFactory = publisherFactory;
     this.properties = properties;
   }
@@ -48,7 +42,6 @@
   public void start() {
     Publisher publisher = publisherFactory.create(properties);
     publisher.start();
-    defaultEventWorker.addPublisher(publisher);
     publisherList.add(publisher);
   }
 
@@ -56,7 +49,6 @@
   public void stop() {
     for (Publisher publisher : publisherList) {
       publisher.stop();
-      defaultEventWorker.removePublisher(publisher);
     }
     publisherList.clear();
   }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
index 03acb16..21c7b15 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -24,9 +24,9 @@
 import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
 import com.googlesource.gerrit.plugins.kafka.config.KafkaPropertiesProvider;
 import com.googlesource.gerrit.plugins.kafka.message.GsonProvider;
+import com.googlesource.gerrit.plugins.kafka.message.Publisher;
 import com.googlesource.gerrit.plugins.kafka.message.PublisherFactory;
 import com.googlesource.gerrit.plugins.kafka.session.KafkaSessionFactory;
-import com.googlesource.gerrit.plugins.kafka.worker.DefaultEventWorker;
 
 class Module extends FactoryModule {
 
@@ -39,6 +39,6 @@
     bind(Gson.class).toProvider(GsonProvider.class).in(Singleton.class);
 
     DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class);
-    DynamicSet.bind(binder(), EventListener.class).to(DefaultEventWorker.class);
+    DynamicSet.bind(binder(), EventListener.class).to(Publisher.class);
   }
 }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/DefaultEventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/DefaultEventWorker.java
deleted file mode 100644
index 2755c3a..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/DefaultEventWorker.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.worker;
-
-import com.google.gerrit.server.events.Event;
-import com.google.gerrit.server.events.EventListener;
-import com.google.inject.Singleton;
-
-import com.googlesource.gerrit.plugins.kafka.message.Publisher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-@Singleton
-public class DefaultEventWorker implements EventListener, EventWorker {
-  private static final Logger log = LoggerFactory
-      .getLogger(DefaultEventWorker.class);
-  private final Set<Publisher> publishers = new CopyOnWriteArraySet<>();
-
-  @Override
-  public void addPublisher(Publisher publisher) {
-    publishers.add(publisher);
-  }
-
-  @Override
-  public void removePublisher(Publisher publisher) {
-    publishers.remove(publisher);
-  }
-
-  @Override
-  public void clear() {
-    publishers.clear();
-  }
-
-  @Override
-  public void onEvent(Event event) {
-    for (Publisher publisher : publishers) {
-      try {
-        publisher.onEvent(event);
-      } catch (Exception e) {
-        log.error("Unable to process event " + e + " through publisher "
-            + publisher, e);
-      }
-    }
-  }
-}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/EventWorker.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/EventWorker.java
deleted file mode 100644
index eb7afa7..0000000
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/worker/EventWorker.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright (C) 2016 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.googlesource.gerrit.plugins.kafka.worker;
-
-import com.googlesource.gerrit.plugins.kafka.message.Publisher;
-
-public interface EventWorker {
-  void addPublisher(Publisher publisher);
-  void removePublisher(Publisher publisher);
-  void clear();
-}