Decouple stream events from change hooks

Stream events are currently being dispatched from the change hooks
runner.

Both stream events and change hooks will eventually be evicted from
core Gerrit to plugins. Separating the stream events from hooks will
make it easier to move them to plugins separately.

Change-Id: I00c6728fbc52557a3360408272dc84c205c53b2f
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
index 29eb533..015b4d2 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java
@@ -22,6 +22,7 @@
 import com.google.gerrit.common.ChangeHookApiListener;
 import com.google.gerrit.common.ChangeHookRunner;
 import com.google.gerrit.common.EventBroker;
+import com.google.gerrit.common.StreamEventsApiListener;
 import com.google.gerrit.gpg.GpgModule;
 import com.google.gerrit.httpd.AllRequestFilter;
 import com.google.gerrit.httpd.GerritOptions;
@@ -343,6 +344,7 @@
 
     modules.add(new WorkQueue.Module());
     modules.add(new ChangeHookApiListener.Module());
+    modules.add(new StreamEventsApiListener.Module());
     modules.add(new ChangeHookRunner.Module());
     modules.add(new EventBroker.Module());
     modules.add(new AccountPatchReviewStoreImpl.Module());
diff --git a/gerrit-server/src/main/java/com/google/gerrit/common/ChangeHookRunner.java b/gerrit-server/src/main/java/com/google/gerrit/common/ChangeHookRunner.java
index c2d7831..c5ea982 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/common/ChangeHookRunner.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/common/ChangeHookRunner.java
@@ -15,14 +15,11 @@
 package com.google.gerrit.common;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gerrit.common.data.LabelType;
 import com.google.gerrit.common.data.LabelTypes;
 import com.google.gerrit.extensions.events.LifecycleListener;
-import com.google.gerrit.extensions.registration.DynamicItem;
 import com.google.gerrit.lifecycle.LifecycleModule;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.Branch;
@@ -35,24 +32,10 @@
 import com.google.gerrit.server.config.AnonymousCowardName;
 import com.google.gerrit.server.config.GerritServerConfig;
 import com.google.gerrit.server.config.SitePaths;
-import com.google.gerrit.server.data.AccountAttribute;
-import com.google.gerrit.server.data.ApprovalAttribute;
 import com.google.gerrit.server.data.ChangeAttribute;
 import com.google.gerrit.server.data.PatchSetAttribute;
 import com.google.gerrit.server.data.RefUpdateAttribute;
-import com.google.gerrit.server.events.ChangeAbandonedEvent;
-import com.google.gerrit.server.events.ChangeMergedEvent;
-import com.google.gerrit.server.events.ChangeRestoredEvent;
-import com.google.gerrit.server.events.CommentAddedEvent;
-import com.google.gerrit.server.events.DraftPublishedEvent;
 import com.google.gerrit.server.events.EventFactory;
-import com.google.gerrit.server.events.HashtagsChangedEvent;
-import com.google.gerrit.server.events.PatchSetCreatedEvent;
-import com.google.gerrit.server.events.ProjectCreatedEvent;
-import com.google.gerrit.server.events.RefUpdatedEvent;
-import com.google.gerrit.server.events.ReviewerAddedEvent;
-import com.google.gerrit.server.events.ReviewerDeletedEvent;
-import com.google.gerrit.server.events.TopicChangedEvent;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.WorkQueue;
 import com.google.gerrit.server.project.ProjectCache;
@@ -79,7 +62,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -215,8 +197,6 @@
     /** Timeout value for synchronous hooks */
     private final int syncHookTimeout;
 
-    private DynamicItem<EventDispatcher> dispatcher;
-
     /**
      * Create a new ChangeHookRunner.
      *
@@ -234,8 +214,7 @@
       SitePaths sitePath,
       ProjectCache projectCache,
       AccountCache accountCache,
-      EventFactory eventFactory,
-      DynamicItem<EventDispatcher> dispatcher) {
+      EventFactory eventFactory) {
         this.anonymousCowardName = anonymousCowardName;
         this.repoManager = repoManager;
         this.hookQueue = queue.createQueue(1, "hook");
@@ -243,7 +222,6 @@
         this.accountCache = accountCache;
         this.eventFactory = eventFactory;
         this.sitePaths = sitePath;
-        this.dispatcher = dispatcher;
 
         Path hooksPath;
         String hooksPathConfig = config.getString("hooks", null, "path");
@@ -325,12 +303,6 @@
 
     @Override
     public void doProjectCreatedHook(Project.NameKey project, String headName) {
-      ProjectCreatedEvent event = new ProjectCreatedEvent();
-      event.projectName = project.get();
-      event.headName = headName;
-
-      dispatcher.get().postEvent(project, event);
-
       if (!projectCreatedHook.isPresent()) {
         return;
       }
@@ -345,34 +317,25 @@
     @Override
     public void doPatchsetCreatedHook(Change change,
         PatchSet patchSet, ReviewDb db) throws OrmException {
-      PatchSetCreatedEvent event = new PatchSetCreatedEvent(change);
-      Supplier<AccountState> uploader =
-          getAccountSupplier(patchSet.getUploader());
-      Supplier<AccountState> owner = getAccountSupplier(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.uploader = accountAttributeSupplier(uploader);
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!patchsetCreatedHook.isPresent()) {
         return;
       }
 
+      AccountState owner = accountCache.get(change.getOwner());
+      AccountState uploader = accountCache.get(patchSet.getUploader());
       List<String> args = new ArrayList<>();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
 
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
       addArg(args, "--change", c.id);
       addArg(args, "--is-draft", String.valueOf(patchSet.isDraft()));
       addArg(args, "--kind", String.valueOf(ps.kind));
       addArg(args, "--change-url", c.url);
-      addArg(args, "--change-owner", getDisplayName(owner.get().getAccount()));
+      addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
       addArg(args, "--project", c.project);
       addArg(args, "--branch", c.branch);
       addArg(args, "--topic", c.topic);
-      addArg(args, "--uploader", getDisplayName(uploader.get().getAccount()));
+      addArg(args, "--uploader", getDisplayName(uploader.getAccount()));
       addArg(args, "--commit", ps.revision);
       addArg(args, "--patchset", ps.number);
 
@@ -382,32 +345,23 @@
     @Override
     public void doDraftPublishedHook(Change change, PatchSet patchSet,
           ReviewDb db) throws OrmException {
-      DraftPublishedEvent event = new DraftPublishedEvent(change);
-      Supplier<AccountState> uploader =
-          getAccountSupplier(patchSet.getUploader());
-      Supplier<AccountState> owner = getAccountSupplier(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.uploader = accountAttributeSupplier(uploader);
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!draftPublishedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
+      AccountState owner = accountCache.get(change.getOwner());
+      AccountState uploader = accountCache.get(patchSet.getUploader());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-url", c.url);
-      addArg(args, "--change-owner", getDisplayName(owner.get().getAccount()));
+      addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
       addArg(args, "--project", c.project);
       addArg(args, "--branch", c.branch);
       addArg(args, "--topic", c.topic);
-      addArg(args, "--uploader", getDisplayName(uploader.get().getAccount()));
+      addArg(args, "--uploader", getDisplayName(uploader.getAccount()));
       addArg(args, "--commit", ps.revision);
       addArg(args, "--patchset", ps.number);
 
@@ -419,46 +373,19 @@
           PatchSet patchSet, String comment, final Map<String, Short> approvals,
           final Map<String, Short> oldApprovals, ReviewDb db)
               throws OrmException {
-      CommentAddedEvent event = new CommentAddedEvent(change);
-      Supplier<AccountState> owner = getAccountSupplier(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.author =  accountAttributeSupplier(account);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.comment = comment;
-      event.approvals = Suppliers.memoize(
-          new Supplier<ApprovalAttribute[]>() {
-            @Override
-            public ApprovalAttribute[] get() {
-              LabelTypes labelTypes = projectCache.get(
-                  change.getProject()).getLabelTypes();
-              if (approvals.size() > 0) {
-                ApprovalAttribute[] r = new ApprovalAttribute[approvals.size()];
-                int i = 0;
-                for (Map.Entry<String, Short> approval : approvals.entrySet()) {
-                  r[i++] = getApprovalAttribute(labelTypes, approval,
-                      oldApprovals);
-                }
-                return r;
-              }
-              return null;
-            }
-          });
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!commentAddedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--is-draft", patchSet.isDraft() ? "true" : "false");
       addArg(args, "--change-url", c.url);
-      addArg(args, "--change-owner", getDisplayName(owner.get().getAccount()));
+      addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
       addArg(args, "--project", c.project);
       addArg(args, "--branch", c.branch);
       addArg(args, "--topic", c.topic);
@@ -487,27 +414,18 @@
     public void doChangeMergedHook(Change change, Account account,
         PatchSet patchSet, ReviewDb db, String mergeResultRev)
         throws OrmException {
-      ChangeMergedEvent event = new ChangeMergedEvent(change);
-      Supplier<AccountState> owner = getAccountSupplier(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.submitter = accountAttributeSupplier(account);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.newRev = mergeResultRev;
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!changeMergedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-url", c.url);
-      addArg(args, "--change-owner", getDisplayName(owner.get().getAccount()));
+      addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
       addArg(args, "--project", c.project);
       addArg(args, "--branch", c.branch);
       addArg(args, "--topic", c.topic);
@@ -522,23 +440,14 @@
     public void doChangeAbandonedHook(Change change, Account account,
           PatchSet patchSet, String reason, ReviewDb db)
           throws OrmException {
-      ChangeAbandonedEvent event = new ChangeAbandonedEvent(change);
-      AccountState owner = accountCache.get(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.abandoner = accountAttributeSupplier(account);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.reason = reason;
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!changeAbandonedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-url", c.url);
@@ -557,23 +466,14 @@
     public void doChangeRestoredHook(Change change, Account account,
           PatchSet patchSet, String reason, ReviewDb db)
           throws OrmException {
-      ChangeRestoredEvent event = new ChangeRestoredEvent(change);
-      AccountState owner = accountCache.get(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.restorer = accountAttributeSupplier(account);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.reason = reason;
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!changeRestoredHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
-      PatchSetAttribute ps = event.patchSet.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      PatchSetAttribute ps = patchSetAttribute(change, patchSet);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-url", c.url);
@@ -591,27 +491,13 @@
     @Override
     public void doRefUpdatedHook(final Branch.NameKey refName,
         final ObjectId oldId, final ObjectId newId, Account account) {
-      RefUpdatedEvent event = new RefUpdatedEvent();
-
-      if (account != null) {
-        event.submitter = accountAttributeSupplier(account);
-      }
-      event.refUpdate = Suppliers.memoize(
-          new Supplier<RefUpdateAttribute>() {
-            @Override
-            public RefUpdateAttribute get() {
-              return eventFactory.asRefUpdateAttribute(oldId, newId, refName);
-            }
-          });
-
-      dispatcher.get().postEvent(refName, event);
-
       if (!refUpdatedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      RefUpdateAttribute r = event.refUpdate.get();
+      RefUpdateAttribute r =
+          eventFactory.asRefUpdateAttribute(oldId, newId, refName);
       addArg(args, "--oldrev", r.oldRev);
       addArg(args, "--newrev", r.newRev);
       addArg(args, "--refname", r.refName);
@@ -626,25 +512,17 @@
     @Override
     public void doReviewerAddedHook(Change change, Account account,
         PatchSet patchSet, ReviewDb db) throws OrmException {
-      ReviewerAddedEvent event = new ReviewerAddedEvent(change);
-      Supplier<AccountState> owner = getAccountSupplier(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.reviewer = accountAttributeSupplier(account);
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!reviewerAddedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-url", c.url);
-      addArg(args, "--change-owner", getDisplayName(owner.get().getAccount()));
+      addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
       addArg(args, "--project", c.project);
       addArg(args, "--branch", c.branch);
       addArg(args, "--reviewer", getDisplayName(account));
@@ -656,39 +534,12 @@
     public void doReviewerDeletedHook(final Change change, Account account,
       PatchSet patchSet, String comment, final Map<String, Short> approvals,
       final Map<String, Short> oldApprovals, ReviewDb db) throws OrmException {
-
-      ReviewerDeletedEvent event = new ReviewerDeletedEvent(change);
-      event.change = changeAttributeSupplier(change);
-      event.patchSet = patchSetAttributeSupplier(change, patchSet);
-      event.reviewer = accountAttributeSupplier(account);
-      event.comment = comment;
-      event.approvals = Suppliers.memoize(
-          new Supplier<ApprovalAttribute[]>() {
-            @Override
-            public ApprovalAttribute[] get() {
-              LabelTypes labelTypes = projectCache.get(
-                  change.getProject()).getLabelTypes();
-              if (!approvals.isEmpty()) {
-                ApprovalAttribute[] r = new ApprovalAttribute[approvals.size()];
-                int i = 0;
-                for (Map.Entry<String, Short> approval : approvals.entrySet()) {
-                  r[i++] = getApprovalAttribute(labelTypes, approval,
-                      oldApprovals);
-                }
-                return r;
-              }
-              return null;
-            }
-          });
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!reviewerDeletedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
       AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
@@ -720,21 +571,13 @@
     public void doTopicChangedHook(Change change, Account account,
         String oldTopic, ReviewDb db)
             throws OrmException {
-      TopicChangedEvent event = new TopicChangedEvent(change);
-      AccountState owner = accountCache.get(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.changer = accountAttributeSupplier(account);
-      event.oldTopic = oldTopic;
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!topicChangedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
@@ -759,23 +602,13 @@
     public void doHashtagsChangedHook(Change change, Account account,
         Set<String> added, Set<String> removed, Set<String> hashtags, ReviewDb db)
             throws OrmException {
-      HashtagsChangedEvent event = new HashtagsChangedEvent(change);
-      AccountState owner = accountCache.get(change.getOwner());
-
-      event.change = changeAttributeSupplier(change);
-      event.editor = accountAttributeSupplier(account);
-      event.hashtags = hashtagArray(hashtags);
-      event.added = hashtagArray(added);
-      event.removed = hashtagArray(removed);
-
-      dispatcher.get().postEvent(change, event, db);
-
       if (!hashtagsChangedHook.isPresent()) {
         return;
       }
 
       List<String> args = new ArrayList<>();
-      ChangeAttribute c = event.change.get();
+      ChangeAttribute c = eventFactory.asChangeAttribute(change);
+      AccountState owner = accountCache.get(change.getOwner());
 
       addArg(args, "--change", c.id);
       addArg(args, "--change-owner", getDisplayName(owner.getAccount()));
@@ -816,94 +649,16 @@
       }
     }
 
-    private Supplier<AccountState> getAccountSupplier(
-        final Account.Id account) {
-      return Suppliers.memoize(
-          new Supplier<AccountState>() {
-            @Override
-            public AccountState get() {
-              return accountCache.get(account);
-            }
-          });
-    }
-
-    private Supplier<AccountAttribute> accountAttributeSupplier(
-        final Supplier<AccountState> s) {
-      return Suppliers.memoize(
-          new Supplier<AccountAttribute>() {
-            @Override
-            public AccountAttribute get() {
-              return eventFactory.asAccountAttribute(s.get().getAccount());
-            }
-          });
-    }
-
-    private Supplier<AccountAttribute> accountAttributeSupplier(
-        final Account account) {
-      return Suppliers.memoize(
-          new Supplier<AccountAttribute>() {
-            @Override
-            public AccountAttribute get() {
-              return eventFactory.asAccountAttribute(account);
-            }
-          });
-    }
-
-    private Supplier<PatchSetAttribute> patchSetAttributeSupplier(
-        final Change change, final PatchSet patchSet) {
-      return Suppliers.memoize(
-          new Supplier<PatchSetAttribute>() {
-            @Override
-            public PatchSetAttribute get() {
-              try (Repository repo =
-                    repoManager.openRepository(change.getProject());
-                  RevWalk revWalk = new RevWalk(repo)) {
-                return eventFactory.asPatchSetAttribute(
-                    revWalk, change, patchSet);
-              } catch (IOException e) {
-                throw new RuntimeException(e);
-              }
-            }
-          });
-    }
-
-    private Supplier<ChangeAttribute> changeAttributeSupplier(
-        final Change change) {
-      return Suppliers.memoize(
-          new Supplier<ChangeAttribute>() {
-            @Override
-            public ChangeAttribute get() {
-              return eventFactory.asChangeAttribute(change);
-            }
-          });
-    }
-
-    /**
-     * Create an ApprovalAttribute for the given approval suitable for serialization to JSON.
-     * @param labelTypes
-     * @param approval
-     * @param oldApprovals
-     * @return object suitable for serialization to JSON
-     */
-    private ApprovalAttribute getApprovalAttribute(LabelTypes labelTypes,
-            Entry<String, Short> approval,
-            Map<String, Short> oldApprovals) {
-      ApprovalAttribute a = new ApprovalAttribute();
-      a.type = approval.getKey();
-
-      if (oldApprovals != null && !oldApprovals.isEmpty()) {
-        if (oldApprovals.get(approval.getKey()) != null) {
-          a.oldValue = Short.toString(oldApprovals.get(approval.getKey()));
-        }
+    private PatchSetAttribute patchSetAttribute(Change change,
+        PatchSet patchSet) {
+      try (Repository repo =
+            repoManager.openRepository(change.getProject());
+          RevWalk revWalk = new RevWalk(repo)) {
+        return eventFactory.asPatchSetAttribute(
+            revWalk, change, patchSet);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
-      LabelType lt = labelTypes.byLabel(approval.getKey());
-      if (lt != null) {
-        a.description = lt.getName();
-      }
-      if (approval.getValue() != null) {
-        a.value = Short.toString(approval.getValue());
-      }
-      return a;
     }
 
     /**
diff --git a/gerrit-server/src/main/java/com/google/gerrit/common/StreamEventsApiListener.java b/gerrit-server/src/main/java/com/google/gerrit/common/StreamEventsApiListener.java
new file mode 100644
index 0000000..85f4c59
--- /dev/null
+++ b/gerrit-server/src/main/java/com/google/gerrit/common/StreamEventsApiListener.java
@@ -0,0 +1,496 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.common;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Sets;
+import com.google.gerrit.common.data.LabelType;
+import com.google.gerrit.common.data.LabelTypes;
+import com.google.gerrit.extensions.common.AccountInfo;
+import com.google.gerrit.extensions.common.ApprovalInfo;
+import com.google.gerrit.extensions.common.ChangeInfo;
+import com.google.gerrit.extensions.common.RevisionInfo;
+import com.google.gerrit.extensions.events.ChangeAbandonedListener;
+import com.google.gerrit.extensions.events.ChangeMergedListener;
+import com.google.gerrit.extensions.events.ChangeRestoredListener;
+import com.google.gerrit.extensions.events.CommentAddedListener;
+import com.google.gerrit.extensions.events.DraftPublishedListener;
+import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
+import com.google.gerrit.extensions.events.HashtagsEditedListener;
+import com.google.gerrit.extensions.events.NewProjectCreatedListener;
+import com.google.gerrit.extensions.events.ReviewerAddedListener;
+import com.google.gerrit.extensions.events.ReviewerDeletedListener;
+import com.google.gerrit.extensions.events.RevisionCreatedListener;
+import com.google.gerrit.extensions.events.TopicEditedListener;
+import com.google.gerrit.extensions.registration.DynamicItem;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.gerrit.reviewdb.client.Account;
+import com.google.gerrit.reviewdb.client.Branch;
+import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.PatchSet;
+import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.PatchSetUtil;
+import com.google.gerrit.server.data.AccountAttribute;
+import com.google.gerrit.server.data.ApprovalAttribute;
+import com.google.gerrit.server.data.ChangeAttribute;
+import com.google.gerrit.server.data.PatchSetAttribute;
+import com.google.gerrit.server.data.RefUpdateAttribute;
+import com.google.gerrit.server.events.ChangeAbandonedEvent;
+import com.google.gerrit.server.events.ChangeMergedEvent;
+import com.google.gerrit.server.events.ChangeRestoredEvent;
+import com.google.gerrit.server.events.CommentAddedEvent;
+import com.google.gerrit.server.events.DraftPublishedEvent;
+import com.google.gerrit.server.events.EventFactory;
+import com.google.gerrit.server.events.HashtagsChangedEvent;
+import com.google.gerrit.server.events.PatchSetCreatedEvent;
+import com.google.gerrit.server.events.ProjectCreatedEvent;
+import com.google.gerrit.server.events.RefUpdatedEvent;
+import com.google.gerrit.server.events.ReviewerAddedEvent;
+import com.google.gerrit.server.events.ReviewerDeletedEvent;
+import com.google.gerrit.server.events.TopicChangedEvent;
+import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.notedb.ChangeNotes;
+import com.google.gerrit.server.project.NoSuchChangeException;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gwtorm.server.OrmException;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevWalk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+@Singleton
+public class StreamEventsApiListener implements
+    ChangeAbandonedListener,
+    ChangeMergedListener,
+    ChangeRestoredListener,
+    CommentAddedListener,
+    DraftPublishedListener,
+    GitReferenceUpdatedListener,
+    HashtagsEditedListener,
+    NewProjectCreatedListener,
+    ReviewerAddedListener,
+    ReviewerDeletedListener,
+    RevisionCreatedListener,
+    TopicEditedListener {
+  private static final Logger log =
+      LoggerFactory.getLogger(StreamEventsApiListener.class);
+
+  public static class Module extends LifecycleModule {
+    @Override
+    protected void configure() {
+      DynamicSet.bind(binder(), ChangeAbandonedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), ChangeMergedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), ChangeRestoredListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), CommentAddedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), DraftPublishedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), GitReferenceUpdatedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), HashtagsEditedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), NewProjectCreatedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), ReviewerAddedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), ReviewerDeletedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), RevisionCreatedListener.class)
+        .to(StreamEventsApiListener.class);
+      DynamicSet.bind(binder(), TopicEditedListener.class)
+        .to(StreamEventsApiListener.class);
+    }
+  }
+
+  private final DynamicItem<EventDispatcher> dispatcher;
+  private final Provider<ReviewDb> db;
+  private final EventFactory eventFactory;
+  private final ProjectCache projectCache;
+  private final GitRepositoryManager repoManager;
+  private final PatchSetUtil psUtil;
+  private final ChangeNotes.Factory changeNotesFactory;
+
+  @Inject
+  StreamEventsApiListener(DynamicItem<EventDispatcher> dispatcher,
+      Provider<ReviewDb> db,
+      EventFactory eventFactory,
+      ProjectCache projectCache,
+      GitRepositoryManager repoManager,
+      PatchSetUtil psUtil,
+      ChangeNotes.Factory changeNotesFactory) {
+    this.dispatcher = dispatcher;
+    this.db = db;
+    this.eventFactory = eventFactory;
+    this.projectCache = projectCache;
+    this.repoManager = repoManager;
+    this.psUtil = psUtil;
+    this.changeNotesFactory = changeNotesFactory;
+  }
+
+  private ChangeNotes getNotes(ChangeInfo info) throws OrmException {
+    try {
+      return changeNotesFactory.createChecked(new Change.Id(info._number));
+    } catch (NoSuchChangeException e) {
+      throw new OrmException(e);
+    }
+  }
+
+  private Change getChange(ChangeInfo info) throws OrmException {
+    return getNotes(info).getChange();
+  }
+
+  private PatchSet getPatchSet(ChangeNotes notes, RevisionInfo info)
+      throws OrmException {
+    return psUtil.get(db.get(), notes, PatchSet.Id.fromRef(info.ref));
+  }
+
+  private Supplier<ChangeAttribute> changeAttributeSupplier(
+      final Change change) {
+    return Suppliers.memoize(
+        new Supplier<ChangeAttribute>() {
+          @Override
+          public ChangeAttribute get() {
+            return eventFactory.asChangeAttribute(change);
+          }
+        });
+  }
+
+  private Supplier<AccountAttribute> accountAttributeSupplier(
+      final AccountInfo account) {
+    return Suppliers.memoize(
+        new Supplier<AccountAttribute>() {
+          @Override
+          public AccountAttribute get() {
+            return eventFactory.asAccountAttribute(
+                new Account.Id(account._accountId));
+          }
+        });
+  }
+
+  private Supplier<PatchSetAttribute> patchSetAttributeSupplier(
+      final Change change, final PatchSet patchSet) {
+    return Suppliers.memoize(
+        new Supplier<PatchSetAttribute>() {
+          @Override
+          public PatchSetAttribute get() {
+            try (Repository repo =
+                  repoManager.openRepository(change.getProject());
+                RevWalk revWalk = new RevWalk(repo)) {
+              return eventFactory.asPatchSetAttribute(
+                  revWalk, change, patchSet);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+  }
+
+  private static Map<String, Short> convertApprovalsMap(
+      Map<String, ApprovalInfo> approvals) {
+    Map<String, Short> result = new HashMap<>();
+    for (Entry<String, ApprovalInfo> e : approvals.entrySet()) {
+      Short value =
+          e.getValue().value == null ? null : e.getValue().value.shortValue();
+      result.put(e.getKey(), value);
+    }
+    return result;
+  }
+
+  private ApprovalAttribute getApprovalAttribute(LabelTypes labelTypes,
+      Entry<String, Short> approval,
+      Map<String, Short> oldApprovals) {
+  ApprovalAttribute a = new ApprovalAttribute();
+    a.type = approval.getKey();
+
+    if (oldApprovals != null && !oldApprovals.isEmpty()) {
+      if (oldApprovals.get(approval.getKey()) != null) {
+        a.oldValue = Short.toString(oldApprovals.get(approval.getKey()));
+      }
+    }
+    LabelType lt = labelTypes.byLabel(approval.getKey());
+    if (lt != null) {
+      a.description = lt.getName();
+    }
+    if (approval.getValue() != null) {
+      a.value = Short.toString(approval.getValue());
+    }
+    return a;
+  }
+
+  private Supplier<ApprovalAttribute[]> approvalsAttributeSupplier(
+      final Change change, Map<String, ApprovalInfo> newApprovals,
+      final Map<String, ApprovalInfo> oldApprovals) {
+    final Map<String, Short> approvals = convertApprovalsMap(newApprovals);
+    return Suppliers.memoize(
+        new Supplier<ApprovalAttribute[]>() {
+          @Override
+          public ApprovalAttribute[] get() {
+            LabelTypes labelTypes = projectCache.get(
+                change.getProject()).getLabelTypes();
+            if (approvals.size() > 0) {
+              ApprovalAttribute[] r = new ApprovalAttribute[approvals.size()];
+              int i = 0;
+              for (Map.Entry<String, Short> approval : approvals.entrySet()) {
+                r[i++] = getApprovalAttribute(labelTypes, approval,
+                    convertApprovalsMap(oldApprovals));
+              }
+              return r;
+            }
+            return null;
+          }
+        });
+  }
+
+  String[] hashtagArray(Collection<String> hashtags) {
+    if (hashtags != null && hashtags.size() > 0) {
+      return Sets.newHashSet(hashtags).toArray(
+          new String[hashtags.size()]);
+    }
+    return null;
+  }
+
+  @Override
+  public void onTopicEdited(TopicEditedListener.Event ev) {
+    try {
+      Change change = getChange(ev.getChange());
+      TopicChangedEvent event = new TopicChangedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.changer = accountAttributeSupplier(ev.getEditor());
+      event.oldTopic = ev.getOldTopic();
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onRevisionCreated(RevisionCreatedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      PatchSet patchSet = getPatchSet(notes, ev.getRevision());
+      PatchSetCreatedEvent event = new PatchSetCreatedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.patchSet = patchSetAttributeSupplier(change, patchSet);
+      event.uploader = accountAttributeSupplier(ev.getUploader());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onReviewerDeleted(final ReviewerDeletedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      ReviewerDeletedEvent event = new ReviewerDeletedEvent(change);
+      event.change = changeAttributeSupplier(change);
+      event.patchSet = patchSetAttributeSupplier(change,
+          psUtil.current(db.get(), notes));
+      event.reviewer = accountAttributeSupplier(ev.getReviewer());
+      event.comment = ev.getComment();
+      event.approvals = approvalsAttributeSupplier(change,
+          ev.getNewApprovals(), ev.getOldApprovals());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+
+  }
+
+  @Override
+  public void onReviewerAdded(ReviewerAddedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      ReviewerAddedEvent event = new ReviewerAddedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.patchSet = patchSetAttributeSupplier(change,
+          psUtil.current(db.get(), notes));
+      event.reviewer = accountAttributeSupplier(ev.getReviewer());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onNewProjectCreated(NewProjectCreatedListener.Event ev) {
+    ProjectCreatedEvent event = new ProjectCreatedEvent();
+    event.projectName = ev.getProjectName();
+    event.headName = ev.getHeadName();
+
+    dispatcher.get().postEvent(event.getProjectNameKey(), event);
+  }
+
+  @Override
+  public void onHashtagsEdited(HashtagsEditedListener.Event ev) {
+    try {
+      Change change = getChange(ev.getChange());
+      HashtagsChangedEvent event = new HashtagsChangedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.editor = accountAttributeSupplier(ev.getEditor());
+      event.hashtags = hashtagArray(ev.getHashtags());
+      event.added = hashtagArray(ev.getAddedHashtags());
+      event.removed = hashtagArray(ev.getRemovedHashtags());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onGitReferenceUpdated(final GitReferenceUpdatedListener.Event ev) {
+    RefUpdatedEvent event = new RefUpdatedEvent();
+    if (ev.getUpdater() != null) {
+      event.submitter = accountAttributeSupplier(ev.getUpdater());
+    }
+    final Branch.NameKey refName =
+        new Branch.NameKey(ev.getProjectName(), ev.getRefName());
+    event.refUpdate = Suppliers.memoize(
+        new Supplier<RefUpdateAttribute>() {
+          @Override
+          public RefUpdateAttribute get() {
+            return eventFactory.asRefUpdateAttribute(
+                ObjectId.fromString(ev.getOldObjectId()),
+                ObjectId.fromString(ev.getNewObjectId()),
+                refName);
+          }
+        });
+    dispatcher.get().postEvent(refName, event);
+  }
+
+  @Override
+  public void onDraftPublished(DraftPublishedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      PatchSet ps = getPatchSet(notes, ev.getRevision());
+      DraftPublishedEvent event = new DraftPublishedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.patchSet = patchSetAttributeSupplier(change, ps);
+      event.uploader = accountAttributeSupplier(ev.getPublisher());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onCommentAdded(CommentAddedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      PatchSet ps = getPatchSet(notes, ev.getRevision());
+      CommentAddedEvent event = new CommentAddedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.author =  accountAttributeSupplier(ev.getAuthor());
+      event.patchSet = patchSetAttributeSupplier(change, ps);
+      event.comment = ev.getComment();
+      event.approvals = approvalsAttributeSupplier(
+          change, ev.getApprovals(), ev.getOldApprovals());
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onChangeRestored(ChangeRestoredListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      ChangeRestoredEvent event = new ChangeRestoredEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.restorer = accountAttributeSupplier(ev.getRestorer());
+      event.patchSet = patchSetAttributeSupplier(change,
+          psUtil.current(db.get(), notes));
+      event.reason = ev.getReason();
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onChangeMerged(ChangeMergedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      ChangeMergedEvent event = new ChangeMergedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.submitter = accountAttributeSupplier(ev.getMerger());
+      event.patchSet = patchSetAttributeSupplier(change,
+          psUtil.current(db.get(), notes));
+      event.newRev = ev.getNewRevisionId();
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+
+  @Override
+  public void onChangeAbandoned(ChangeAbandonedListener.Event ev) {
+    try {
+      ChangeNotes notes = getNotes(ev.getChange());
+      Change change = notes.getChange();
+      ChangeAbandonedEvent event = new ChangeAbandonedEvent(change);
+
+      event.change = changeAttributeSupplier(change);
+      event.abandoner = accountAttributeSupplier(ev.getAbandoner());
+      event.patchSet = patchSetAttributeSupplier(change,
+          psUtil.current(db.get(), notes));
+      event.reason = ev.getReason();
+
+      dispatcher.get().postEvent(change, event, db.get());
+    } catch (OrmException e) {
+      log.error("Failed to dispatch event", e);
+    }
+  }
+}
diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
index 23b816c..9c1bab8 100644
--- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
+++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java
@@ -21,6 +21,7 @@
 import com.google.gerrit.common.ChangeHookApiListener;
 import com.google.gerrit.common.ChangeHookRunner;
 import com.google.gerrit.common.EventBroker;
+import com.google.gerrit.common.StreamEventsApiListener;
 import com.google.gerrit.gpg.GpgModule;
 import com.google.gerrit.httpd.auth.oauth.OAuthModule;
 import com.google.gerrit.httpd.auth.openid.OpenIdModule;
@@ -300,6 +301,7 @@
     modules.add(new AccountPatchReviewStoreImpl.Module());
     modules.add(cfgInjector.getInstance(GitRepositoryManagerModule.class));
     modules.add(new ChangeHookApiListener.Module());
+    modules.add(new StreamEventsApiListener.Module());
     modules.add(new ChangeHookRunner.Module());
     modules.add(new ReceiveCommitsExecutorModule());
     modules.add(new DiffExecutorModule());