Create an events plugin

Add a stream command which behaves much like the core stream-events
command except that it can additionally take a --ids switch which add
event ids to each event, and a --resume-after switch to start streaming
events at an older event.  This plugin captures events in memory, so the
events are lost on server restart, and the ids will reset.

Change-Id: I9343d04dd62eed582144a56f0e07342b7972e314
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..26238ea
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (C) 2016 The Android Open Source Project
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.googlesource.gerrit.plugins.events</groupId>
+  <artifactId>events</artifactId>
+  <name>events</name>
+  <packaging>jar</packaging>
+  <version>2.7</version>
+
+  <properties>
+    <Gerrit-ApiType>plugin</Gerrit-ApiType>
+    <Gerrit-ApiVersion>${project.version}</Gerrit-ApiVersion>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <configuration>
+          <archive>
+            <manifestEntries>
+              <Gerrit-Module>com.googlesource.gerrit.plugins.events.Module</Gerrit-Module>
+              <Gerrit-SshModule>com.googlesource.gerrit.plugins.events.SshModule</Gerrit-SshModule>
+
+              <Implementation-Vendor>QuIC Qualcomm Innovation Center</Implementation-Vendor>
+              <Implementation-URL>https://gerrit-review.googlesource.com/#/admin/projects/plugins/events</Implementation-URL>
+
+              <Implementation-Title>Plugin ${project.artifactId}</Implementation-Title>
+              <Implementation-Version>${project.version}</Implementation-Version>
+
+              <Gerrit-ApiType>${Gerrit-ApiType}</Gerrit-ApiType>
+              <Gerrit-ApiVersion>${Gerrit-ApiVersion}</Gerrit-ApiVersion>
+            </manifestEntries>
+          </archive>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+          <encoding>UTF-8</encoding>
+          <fork>true</fork>
+          <compilerArgs>
+            <arg>-XX:MaxPermSize=256m</arg>
+          </compilerArgs>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.gerrit</groupId>
+      <artifactId>gerrit-${Gerrit-ApiType}-api</artifactId>
+      <version>${Gerrit-ApiVersion}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <repositories>
+    <repository>
+      <id>gerrit-api-repository</id>
+      <url>https://gerrit-api.commondatastorage.googleapis.com/release/</url>
+    </repository>
+  </repositories>
+</project>
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java b/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java
new file mode 100644
index 0000000..6ad01e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java
@@ -0,0 +1,98 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.reviewdb.client.Branch;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gson.JsonElement;
+import com.google.inject.Inject;
+
+public class BranchHelper {
+  protected final ProjectCache projectCache;
+
+  @Inject
+  BranchHelper(ProjectCache projectCache) {
+    this.projectCache = projectCache;
+  }
+
+  public boolean isVisibleTo(JsonElement event, IdentifiedUser user) {
+    return isVisibleTo(getBranch(event), user);
+  }
+
+  public boolean isVisibleTo(Branch.NameKey branchName, IdentifiedUser user) {
+    if (branchName == null) {
+      return false;
+    }
+    ProjectState pe = projectCache.get(branchName.getParentKey());
+    if (pe == null) {
+      return false;
+    }
+    return pe.controlFor(user).controlForRef(branchName).isVisible();
+  }
+
+  public static Branch.NameKey getBranch(JsonElement event) {
+    // Known events of this type:
+    //  CommentAddedEvent, ChangeMergedEvent, ChangeAbandonedEvent,
+    //  ChangeRestoredEvent, DraftPublishedEvent, MergeFailedEvent,
+    //  PatchSetCreatedEvent, ReviewerAddedEvent:
+    JsonElement projectParent = event.getAsJsonObject().get("change");
+    if (projectParent == null) {
+      // Known events of this type: RefUpdatedEvent
+      projectParent = event.getAsJsonObject().get("refUpdate");
+    }
+    if (projectParent == null) {
+      // Known events of this type:
+      //  CommitReceivedEvent, RefReplicationDoneEvent, RefReplicatedEvent
+      projectParent = event;
+    }
+
+    if (projectParent != null) {
+      JsonElement project = projectParent.getAsJsonObject().get("project");
+      if (project != null) {
+        // Known events of this type:
+        //  CommentAddedEvent, ChangeMergedEvent, ChangeAbandonedEvent,
+        //  ChangeRestoredEvent, DraftPublishedEvent, MergeFailedEvent,
+        //  PatchSetCreatedEvent, ReviewerAddedEvent:
+        JsonElement branch = projectParent.getAsJsonObject().get("branch");
+        if (branch == null) {
+          // Known events of this type: RefUpdatedEvent, CommitReceivedEvent
+          branch = projectParent.getAsJsonObject().get("refName");
+        }
+        if (branch == null) {
+          // Known events of this type:
+          //  RefReplicationDoneEvent, RefReplicatedEvent
+          branch = projectParent.getAsJsonObject().get("ref");
+        }
+
+        if (branch != null) {
+          return getBranch(project, branch);
+        }
+      }
+    }
+    return null;
+  }
+
+  protected static Branch.NameKey getBranch(JsonElement project, JsonElement branch) {
+    return getBranch(project.getAsString(), branch.getAsString());
+  }
+
+  protected static Branch.NameKey getBranch(String project, String branch) {
+    return new Branch.NameKey(new Project.NameKey(project), (branch.startsWith("refs/") ? "" : "refs/heads/") + branch);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
new file mode 100644
index 0000000..422f377
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class CoreListener implements ChangeListener {
+  private static Logger log = LoggerFactory.getLogger(CoreListener.class);
+
+  protected static final Gson gson = new Gson();
+  protected final DynamicSet<StreamEventListener> listeners;
+  protected final EventStore store;
+
+  @Inject
+  protected CoreListener(EventStore store, DynamicSet<StreamEventListener> listeners) {
+    this.store = store;
+    this.listeners = listeners;
+  }
+
+  @Override
+  public void onChangeEvent(ChangeEvent event) {
+    try {
+      store.add(gson.toJson(event));
+    } catch (IOException e) {
+      log.error("Cannot add event to event store", e);
+    }
+    for (StreamEventListener l : listeners) {
+      l.onStreamEventUpdate();
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java
new file mode 100644
index 0000000..77e086d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public interface EventStore {
+  UUID getUuid() throws IOException;
+
+  long getHead() throws IOException;
+
+  long getTail() throws IOException;
+
+  void add(String event) throws IOException;
+
+  /** returns null if event does not exist */
+  String get(long n) throws IOException;
+
+  void trim(long trim) throws IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java
new file mode 100644
index 0000000..e658f7c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java
@@ -0,0 +1,70 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+
+@Singleton
+public class MemStore implements EventStore {
+  protected final Map<Long, String> eventsByIndex = new ConcurrentHashMap<Long, String>();
+  protected final UUID uuid;
+
+  protected long head = 0;
+  protected long tail = 1;
+
+  public MemStore() {
+    uuid = UUID.randomUUID();
+  }
+
+  @Override
+  public UUID getUuid() {
+    return uuid;
+  }
+
+  @Override
+  public synchronized void add(String event) {
+    eventsByIndex.put(head + 1, event);
+    head++;
+  }
+
+  @Override
+  public String get(long n) {
+    return eventsByIndex.get(n);
+  }
+
+  @Override
+  public long getHead() {
+    return head;
+  }
+
+  @Override
+  public long getTail() {
+    return head == 0 ? 0 : tail;
+  }
+
+  @Override
+  public void trim(long trim) {
+    if (trim >= head) {
+      trim = head - 1;
+    }
+    for (long i = tail; i <= trim; i++) {
+      tail++;
+      eventsByIndex.remove(i);
+    }
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
new file mode 100644
index 0000000..3c9d6cf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.AbstractModule;
+
+public class Module extends AbstractModule {
+  @Override
+  protected void configure() {
+    DynamicSet.setOf(binder(), StreamEventListener.class);
+    bind(EventStore.class).to(MemStore.class);
+    DynamicSet.bind(binder(), ChangeListener.class).to(CoreListener.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java b/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java
new file mode 100644
index 0000000..7eb00c9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.sshd.PluginCommandModule;
+
+public class SshModule extends PluginCommandModule {
+  @Override
+  protected void configureCommands() {
+    command(StreamEvents.class);
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java
new file mode 100644
index 0000000..e54f164
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java
@@ -0,0 +1,19 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+public interface StreamEventListener {
+  void onStreamEventUpdate();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
new file mode 100644
index 0000000..fa2c70c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
@@ -0,0 +1,247 @@
+// Copyright (C) 2010 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
+import com.google.gerrit.sshd.BaseCommand;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.StreamCommandExecutor;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.Future;
+import org.apache.sshd.server.Environment;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RequiresCapability(GlobalCapability.STREAM_EVENTS)
+@CommandMetaData(name = "stream", descr = "Monitor events occurring in real time")
+public final class StreamEvents extends BaseCommand {
+  private static final Logger log = LoggerFactory.getLogger(StreamEvents.class);
+
+  protected static final int BATCH_SIZE = 32; // yield thread after
+  protected static final Gson gson = new Gson();
+  protected static final JsonParser parser = new JsonParser();
+
+  @Option(
+    name = "--resume-after",
+    metaVar = "RESUME_AFTER",
+    usage = "event id after which to resume playing events on connection"
+  )
+  protected void parseId(String arg) throws IOException {
+    resume = 0;
+    if (arg.equals("0")) {
+      return;
+    }
+
+    String[] ids = arg.split(":");
+    if (ids.length == 2) {
+      if (!ids[0].equals(events.getUuid().toString())) { // store has changed
+        return;
+      }
+
+      try {
+        resume = new Long(ids[1]);
+        return;
+      } catch (NumberFormatException e) { // fall through
+      }
+    }
+    throw new IllegalArgumentException("Invalid event Id: " + arg);
+  }
+
+  protected long resume = -1;
+
+  @Option(name = "--ids", usage = "add ids to events (useful for resuming after a disconnect)")
+  protected boolean includeIds = false;
+
+  @Inject @StreamCommandExecutor protected WorkQueue.Executor threadPool;
+
+  @Inject protected EventStore events;
+
+  @Inject protected DynamicSet<StreamEventListener> subscriptionListeners;
+
+  @Inject protected BranchHelper perms;
+
+  @Inject protected IdentifiedUser currentUser;
+
+  protected CancelableRunnable flusherRunnable;
+  protected RegistrationHandle subscription;
+
+  protected final Object crossThreadlock = new Object();
+  protected Future<?> flusherTask;
+  protected PrintWriter stdout;
+
+  protected long sent;
+  protected volatile boolean shuttingDown = false;
+
+  @Override
+  public void start(final Environment env) throws IOException {
+    try {
+      parseCommandLine();
+    } catch (UnloggedFailure e) {
+      String msg = e.getMessage();
+      if (!msg.endsWith("\n")) {
+        msg += "\n";
+      }
+      err.write(msg.getBytes("UTF-8"));
+      err.flush();
+      onExit(1);
+      return;
+    }
+    stdout = toPrintWriter(out);
+
+    initSent();
+    flusherRunnable = createFlusherRunnable();
+    subscribe();
+    startFlush();
+  }
+
+  protected CancelableRunnable createFlusherRunnable() {
+    return new CancelableRunnable() {
+      @Override
+      public void run() {
+        try {
+          flushBatch();
+        } catch (IOException e) {
+          log.error("Error Flushing Stream Events", e);
+        }
+      }
+
+      @Override
+      public void cancel() {
+        onExit(0);
+      }
+    };
+  }
+
+  protected void initSent() throws IOException {
+    long head = events.getHead();
+    long tail = events.getTail();
+    if (resume == -1 || resume > head) {
+      sent = head;
+    } else {
+      sent = resume;
+    }
+    if (sent < tail) {
+      sent = tail - 1;
+    }
+  }
+
+  protected void startFlush() throws IOException {
+    synchronized (crossThreadlock) {
+      if (flusherTask == null && !shuttingDown) {
+        if (sent < events.getHead()) {
+          flusherTask = threadPool.submit(flusherRunnable);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void onExit(final int rc) {
+    unsubscribe();
+    synchronized (crossThreadlock) {
+      shuttingDown = true;
+    }
+    super.onExit(rc);
+  }
+
+  @Override
+  public void destroy() {
+    unsubscribe();
+    synchronized (crossThreadlock) {
+      boolean alreadyShuttingDown = shuttingDown;
+      shuttingDown = true;
+      if (flusherTask != null) {
+        flusherTask.cancel(true);
+      } else if (!alreadyShuttingDown) {
+        onExit(0);
+      }
+    }
+  }
+
+  protected void subscribe() {
+    subscription =
+        subscriptionListeners.add(
+            new StreamEventListener() {
+              @Override
+              public void onStreamEventUpdate() {
+                try {
+                  startFlush();
+                } catch (IOException e) {
+                  log.error("Error starting to flushing Stream Events", e);
+                }
+              }
+            });
+  }
+
+  protected void unsubscribe() {
+    if (subscription != null) {
+      subscription.remove();
+      subscription = null;
+    }
+  }
+
+  protected void flushBatch() throws IOException {
+    String uuid = events.getUuid().toString();
+    int processed = 0;
+    long head = events.getHead();
+    while (sent < head && processed < BATCH_SIZE) {
+      long sending = sent + 1;
+      String event = events.get(sending);
+      if (Thread.interrupted() || stdout.checkError()) {
+        onExit(0);
+        return;
+      }
+      flush(uuid, sending, event);
+      sent = sending;
+      processed++;
+    }
+    synchronized (crossThreadlock) {
+      flusherTask = null;
+    }
+    startFlush();
+  }
+
+  protected void flush(String uuid, long number, String json) {
+    if (json != null) {
+      JsonElement el = parser.parse(json);
+      if (perms.isVisibleTo(el, currentUser)) {
+        if (includeIds) {
+          el.getAsJsonObject().addProperty("id", uuid + ":" + number);
+          json = gson.toJson(el);
+        }
+        flush(json + "\n");
+      }
+    }
+  }
+
+  protected void flush(String msg) {
+    synchronized (stdout) {
+      stdout.print(msg);
+      stdout.flush();
+    }
+  }
+}
diff --git a/src/main/resources/Documentation/cmd-stream.md b/src/main/resources/Documentation/cmd-stream.md
new file mode 100644
index 0000000..2e9b7ad
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-stream.md
@@ -0,0 +1,280 @@
+@PLUGIN@ stream
+===============
+
+NAME
+----
+@PLUGIN@ stream - Monitor events occurring in real time
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ stream
+   [--ids]
+   [--resume-after <RESUME_AFTER>]
+```
+
+DESCRIPTION
+-----------
+
+Provides a portal into the major events occurring on the server,
+outputing activity data in real-time to the client.  Events are
+filtered by the caller's access permissions, ensuring the caller
+only receives events for changes they can view on the web, or in
+the project repository.
+
+It is possible to make the events numbered so that clients may
+request the server to send all the known (and visible) events
+starting after a specific event using its id.  This makes it
+possible to retrieve the events which occured when the client
+was disconnected.
+
+Event output is in JSON, one event per line.
+
+OPTIONS
+-----------
+**--ids**
+
+: add ids to events, useful for resuming after a disconnect
+(see --resume-after).
+
+**--resume-after**
+
+: event id after which to resume playing events on connection.
+
+
+ACCESS
+------
+Any user who has configured an SSH key.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+EXAMPLES
+--------
+
+```
+    $ ssh -p 29418 review.example.com @PLUGIN@ stream-events --ids
+    {"type":"comment-added", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:1"}
+    {"type":"change-merged", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:2"}
+
+```
+
+```
+    $ ssh -p 29418 review.example.com @PLUGIN@ stream-events --ids \
+        --resume-after bdedff7d-34fd-4459-a6a7-3f738f32c01d:1
+    {"type":"change-merged", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:2"}
+    {"type":"change-abandoned", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:3"}
+
+```
+
+SCHEMA
+------
+The JSON messages consist of nested objects referencing the *change*,
+*patchSet*, *account* involved, and other attributes as appropriate.
+The currently supported message types are *patchset-created*,
+*draft-published*, *change-abandoned*, *change-restored*,
+*change-merged*, *merge-failed*, *comment-added*, *ref-updated* and
+*reviewer-added*.
+
+Note that any field may be missing in the JSON messages, so consumers of
+this JSON stream should deal with that appropriately.
+
+### Events
+
+#### Patchset Created
+
+**type**
+
+: "patchset-created"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**uploader**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Draft Published
+
+**type**
+
+:  "draft-published"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchset**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**uploader**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Change Abandoned
+
+**type**
+
+:  "change-abandoned"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**abandoner**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+:  Reason for abandoning the change.
+
+#### Change Restored
+
+**type**
+
+:  "change-restored"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**restorer**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+:  Reason for restoring the change.
+
+#### Change Merged
+
+**type**
+
+:  "change-merged"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Merge Failed
+
+**type**
+
+:  "merge-failed"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+:  Reason that the merge failed.
+
+#### Comment Added
+
+**type**
+
+:  "comment-added"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**author**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**approvals**
+
+: All [approval attributes](../../../Documentation/json.html#approval)
+
+**comment**
+
+:  Comment text author had written
+
+#### Ref Updated
+
+**type**
+
+:  "ref-updated"
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**refUpdate**
+
+: [refUpdate attribute](../../../Documentation/json.html#refUpdate)
+
+#### Reviewer Added
+
+**type**
+
+:  "reviewer-added"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchset**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**reviewer**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+
+SEE ALSO
+--------
+
+* [JSON Data Formats](../../../Documentation/json.html)
+* [Access Controls](../../../Documentation/access-control.html)
diff --git a/test/lib_result.sh b/test/lib_result.sh
new file mode 100755
index 0000000..e883b81
--- /dev/null
+++ b/test/lib_result.sh
@@ -0,0 +1,22 @@
+# ---- TEST RESULTS ----
+result() { # test [error_message]
+    local result=$?
+    if [ $result -eq 0 ] ; then
+        echo "PASSED - $1 test"
+    else
+        echo "*** FAILED *** - $1 test"
+        RESULT=$result
+        [ $# -gt 1 ] && echo "$2"
+    fi
+}
+
+# output must match expected to pass
+result_out() { # test expected output
+    local disp=$(echo "Expected Output:" ;\
+                 echo "    $2" ;\
+                 echo "Actual Output:" ;\
+                 echo "    $3")
+
+    [ "$2" = "$3" ]
+    result "$1" "$disp"
+}
diff --git a/test/test_events_plugin.sh b/test/test_events_plugin.sh
new file mode 100755
index 0000000..9af3d4a
--- /dev/null
+++ b/test/test_events_plugin.sh
@@ -0,0 +1,247 @@
+#!/bin/bash
+
+q() { "$@" > /dev/null 2>&1 ; } # cmd [args...]  # quiet a command
+gssh() { ssh -p 29418 -x "$SERVER" "$@" 2>&1 ; } # run a gerrit ssh command
+mygit() { git --work-tree="$REPO_DIR" --git-dir="$GIT_DIR" "$@" ; } # [args...]
+
+cleanup() {
+    wait_event
+    (kill_captures ; sleep 1 ; kill_captures -9 ) &
+}
+
+# > uuid
+gen_uuid() { uuidgen | openssl dgst -sha1 -binary | xxd -p; }
+
+gen_commit_msg() { # msg > commit_msg
+    local msg=$1
+    echo "$msg
+
+Change-Id: I$(gen_uuid)"
+}
+
+get_change_num() { # < gerrit_push_response > changenum
+    local url=$(awk '/New Changes:/ { getline; print $2 }')
+    echo "${url##*\/}" | tr -d -c '[:digit:]'
+}
+
+create_change() { # [--dependent] [--draft] branch file [commit_message] > changenum
+    local opt_d opt_c opt_draft=false
+    [ "$1" = "--dependent" ] && { opt_d=$1 ; shift ; }
+    [ "$1" = "--draft" ] && { opt_draft=true ; shift ; }
+    local branch=$1 tmpfile=$2 msg=$3 out rtn
+    local content=$RANDOM dest=refs/for/$branch
+    "$opt_draft" && dest=refs/drafts/$branch
+
+    if [ -z "$opt_d" ] ; then
+        out=$(mygit fetch "$GITURL" "$branch" 2>&1) ||\
+            cleanup "Failed to fetch $branch: $out"
+        out=$(mygit checkout FETCH_HEAD 2>&1) ||\
+            cleanup "Failed to checkout $branch: $out"
+    fi
+
+    echo -e "$content" > "$tmpfile"
+
+    out=$(mygit add "$tmpfile" 2>&1) || cleanup "Failed to git add: $out"
+
+    [ -n "$msg" ] || msg=$(gen_commit_msg "Add $tmpfile")
+
+    out=$(mygit commit -m "$msg" 2>&1) ||\
+        cleanup "Failed to commit change: $out"
+    [ -n "$VERBOSE" ] && echo "  commit:$out" >&2
+
+    out=$(mygit push "$GITURL" "HEAD:$dest" 2>&1) ||\
+        cleanup "Failed to push change: $out"
+    out=$(echo "$out" | get_change_num) ; rtn=$? ; echo "$out"
+    [ -n "$VERBOSE" ] && echo "  change:$out" >&2
+    return $rtn
+}
+
+review() { gssh gerrit review "$@" ; }
+
+submit() { # change,ps
+   local out=$(review "$1" --submit)
+   local acl_err="one or more approvals failed; review output above"
+   local conflict_err="The change could not be merged due to a path conflict."
+
+   if echo "$out" | grep -q "$acl_err" ; then
+      if ! echo "$out" | grep -q "$conflict_err" ; then
+            echo "$out"
+            echo "User needs ACLs to approve and submit changes to $REF_BRANCH"
+            exit 1
+       fi
+   fi
+}
+
+# ------------------------- Event Capturing ---------------------------
+
+kill_captures() { # sig
+    local pid
+    for pid in "${CAPTURE_PIDS[@]}" ; do
+        q kill $1 $pid
+    done
+}
+
+setup_captures() {
+    ssh -p 29418 -x "$SERVER" "${CORE_CMD[@]}" > "$EVENTS_CORE" &
+    CAPTURE_PIDS=("${CAPTURE_PIDS[@]}" $!)
+    ssh -p 29418 -x "$SERVER" "${PLUGIN_CMD[@]}" > "$EVENTS_PLUGIN" &
+    CAPTURE_PIDS=("${CAPTURE_PIDS[@]}" $!)
+}
+
+capture_events() { # count
+    local count=$1
+    [ -n "$count" ] || count=1
+    ssh -p 29418 -x "$SERVER" "${PLUGIN_CMD[@]}" > "$EVENT_FIFO" &
+    CAPTURE_PID_SSH=$!
+    head -n $count < "$EVENT_FIFO" > "$EVENTS" &
+    CAPTURE_PID_HEAD=$!
+    sleep 1
+}
+
+wait_event() {
+   (sleep 1 ; q kill -9 $CAPTURE_PID_SSH ; q kill -9 $CAPTURE_PID_HEAD ) &
+    q wait $CAPTURE_PID_SSH $CAPTURE_PID_HEAD
+}
+
+get_event() { # number
+    local number=$1
+    [ -n "$number" ] || number=1
+
+    awk "NR==$number" "$EVENTS"
+}
+
+result_type() { # test type [n]
+    local test=$1 type=$2 number=$3
+    [ -n "$number" ] || number=1
+    wait_event
+    local event=$(get_event "$number")
+    echo "$event" | grep -q "\"type\":\"$type\""
+    result "$test $type" "$event"
+}
+
+# ------------------------- Usage ---------------------------
+
+usage() { # [error_message]
+    cat <<-EOF
+Usage: $MYPROG [-s|--server <server>] [-p|--project <project>]
+             [-r|--ref <ref branch>] [-g|--plugin <plugin>] [-h|--help]
+
+       -h|--help                usage/help
+       -s|--server <server>     server to use for the test (default: localhost)
+       -p|--project <project>   git project to use (default: project0)
+       -r|--ref <ref branch>    reference branch used to create branches (default: master)
+       --approvals <approvals>  approvals needed for submit (default: --code-review 2)
+       --plugin-cmd <cmd>       event streaming command for plugin (default: <plugin> stream)
+       --core-cmd <cmd>         event streaming command for core (default: gerrit stream-events)
+EOF
+
+    [ -n "$1" ] && echo -e '\n'"ERROR: $1"
+    exit 1
+}
+
+parseArgs() {
+    SERVER="localhost"
+    PROJECT="tools/test/project0"
+    REF_BRANCH="master"
+    APPROVALS="--code-review 2"
+    CORE_CMD=(gerrit stream-events)
+    PLUGIN_CMD=(events stream)
+    while (( "$#" )) ; do
+        case "$1" in
+            --server|-s)  shift; SERVER=$1 ;;
+            --project|-p) shift; PROJECT=$1 ;;
+            --ref|-r)     shift; REF_BRANCH=$1 ;;
+            --approvals)  shift; APPROVALS=$1 ;;
+            --plugin-cmd) shift; PLUGIN_CMD=($1) ;;
+            --core-cmd)   shift; CORE_CMD=($1) ;;
+            --help|-h)    usage ;;
+            --verbose|-v) VERBOSE=$1 ;;
+            *)            usage "invalid argument '$1'" ;;
+        esac
+        shift
+    done
+
+    [ -n "$SERVER" ]     || usage "server not set"
+    [ -n "$PROJECT" ]    || usage "project not set"
+    [ -n "$REF_BRANCH" ] || usage "ref branch not set"
+}
+
+MYPROG=$(basename "$0")
+MYDIR=$(dirname "$0")
+
+source "$MYDIR/lib_result.sh"
+
+parseArgs "$@"
+
+TEST_DIR=$(readlink -f "$MYDIR/../target/test")
+rm -rf "$TEST_DIR"
+mkdir -p "$TEST_DIR"
+
+GITURL=ssh://$SERVER:29418/$PROJECT
+DEST_REF=$REF_BRANCH
+echo "$REF_BRANCH" | grep -q '^refs/' || DEST_REF=refs/heads/$REF_BRANCH
+git ls-remote "$GITURL" | grep -q "$DEST_REF" || usage "invalid project/server/ref"
+
+REPO_DIR=$TEST_DIR/repo
+q git init "$REPO_DIR"
+GIT_DIR="$REPO_DIR/.git"
+FILE_A="$REPO_DIR/fileA"
+
+EVENTS_CORE=$TEST_DIR/events-core
+EVENTS_PLUGIN=$TEST_DIR/events-plugin
+EVENT_FIFO=$TEST_DIR/event-fifo
+EVENTS=$TEST_DIR/events
+mkfifo "$EVENT_FIFO"
+
+trap cleanup EXIT
+
+setup_captures
+
+# ------------------------- Individual Event Tests ---------------------------
+GROUP=visible-events
+type=patchset-created
+capture_events
+ch1=$(create_change --draft "$REF_BRANCH" "$FILE_A") || exit
+result_type "$GROUP" "$type"
+
+type=draft-published
+capture_events
+review "$ch1,1" --publish
+result_type "$GROUP" "$type"
+
+type=change-abandoned
+capture_events
+review "$ch1,1" --abandon
+result_type "$GROUP" "$type"
+
+type=change-restored
+capture_events
+review "$ch1,1" --restore
+result_type "$GROUP" "$type"
+
+type=comment-added
+capture_events
+review "$ch1,1" --message "my_comment" $APPROVALS
+result_type "$GROUP" "$type"
+
+ch2=$(create_change "$REF_BRANCH" "$FILE_A") || exit
+review "$ch2,1" $APPROVALS
+
+type=change-merged
+capture_events 2
+submit "$ch1,1"
+result_type "$GROUP" "ref-updated"
+result_type "$GROUP" "$type" 2
+
+type=merge-failed
+capture_events
+submit "$ch2,1"
+result_type "$GROUP" "$type"
+
+# reviewer-added needs to be tested via Rest-API
+
+# ------------------------- Compare them all to Core -------------------------
+
+out=$(diff "$EVENTS_CORE" "$EVENTS_PLUGIN")
+result "core/plugin diff" "$out"
+