Create an events plugin
Add a stream command which behaves much like the core stream-events
command except that it can additionally take a --ids switch which add
event ids to each event, and a --resume-after switch to start streaming
events at an older event. This plugin captures events in memory, so the
events are lost on server restart, and the ids will reset.
Change-Id: I9343d04dd62eed582144a56f0e07342b7972e314
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..26238ea
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (C) 2016 The Android Open Source Project
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.googlesource.gerrit.plugins.events</groupId>
+ <artifactId>events</artifactId>
+ <name>events</name>
+ <packaging>jar</packaging>
+ <version>2.7</version>
+
+ <properties>
+ <Gerrit-ApiType>plugin</Gerrit-ApiType>
+ <Gerrit-ApiVersion>${project.version}</Gerrit-ApiVersion>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Gerrit-Module>com.googlesource.gerrit.plugins.events.Module</Gerrit-Module>
+ <Gerrit-SshModule>com.googlesource.gerrit.plugins.events.SshModule</Gerrit-SshModule>
+
+ <Implementation-Vendor>QuIC Qualcomm Innovation Center</Implementation-Vendor>
+ <Implementation-URL>https://gerrit-review.googlesource.com/#/admin/projects/plugins/events</Implementation-URL>
+
+ <Implementation-Title>Plugin ${project.artifactId}</Implementation-Title>
+ <Implementation-Version>${project.version}</Implementation-Version>
+
+ <Gerrit-ApiType>${Gerrit-ApiType}</Gerrit-ApiType>
+ <Gerrit-ApiVersion>${Gerrit-ApiVersion}</Gerrit-ApiVersion>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <encoding>UTF-8</encoding>
+ <fork>true</fork>
+ <compilerArgs>
+ <arg>-XX:MaxPermSize=256m</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.gerrit</groupId>
+ <artifactId>gerrit-${Gerrit-ApiType}-api</artifactId>
+ <version>${Gerrit-ApiVersion}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <repositories>
+ <repository>
+ <id>gerrit-api-repository</id>
+ <url>https://gerrit-api.commondatastorage.googleapis.com/release/</url>
+ </repository>
+ </repositories>
+</project>
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java b/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java
new file mode 100644
index 0000000..6ad01e9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/BranchHelper.java
@@ -0,0 +1,98 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.reviewdb.client.Branch;
+import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.project.ProjectCache;
+import com.google.gerrit.server.project.ProjectControl;
+import com.google.gerrit.server.project.ProjectState;
+import com.google.gson.JsonElement;
+import com.google.inject.Inject;
+
+public class BranchHelper {
+ protected final ProjectCache projectCache;
+
+ @Inject
+ BranchHelper(ProjectCache projectCache) {
+ this.projectCache = projectCache;
+ }
+
+ public boolean isVisibleTo(JsonElement event, IdentifiedUser user) {
+ return isVisibleTo(getBranch(event), user);
+ }
+
+ public boolean isVisibleTo(Branch.NameKey branchName, IdentifiedUser user) {
+ if (branchName == null) {
+ return false;
+ }
+ ProjectState pe = projectCache.get(branchName.getParentKey());
+ if (pe == null) {
+ return false;
+ }
+ return pe.controlFor(user).controlForRef(branchName).isVisible();
+ }
+
+ public static Branch.NameKey getBranch(JsonElement event) {
+ // Known events of this type:
+ // CommentAddedEvent, ChangeMergedEvent, ChangeAbandonedEvent,
+ // ChangeRestoredEvent, DraftPublishedEvent, MergeFailedEvent,
+ // PatchSetCreatedEvent, ReviewerAddedEvent:
+ JsonElement projectParent = event.getAsJsonObject().get("change");
+ if (projectParent == null) {
+ // Known events of this type: RefUpdatedEvent
+ projectParent = event.getAsJsonObject().get("refUpdate");
+ }
+ if (projectParent == null) {
+ // Known events of this type:
+ // CommitReceivedEvent, RefReplicationDoneEvent, RefReplicatedEvent
+ projectParent = event;
+ }
+
+ if (projectParent != null) {
+ JsonElement project = projectParent.getAsJsonObject().get("project");
+ if (project != null) {
+ // Known events of this type:
+ // CommentAddedEvent, ChangeMergedEvent, ChangeAbandonedEvent,
+ // ChangeRestoredEvent, DraftPublishedEvent, MergeFailedEvent,
+ // PatchSetCreatedEvent, ReviewerAddedEvent:
+ JsonElement branch = projectParent.getAsJsonObject().get("branch");
+ if (branch == null) {
+ // Known events of this type: RefUpdatedEvent, CommitReceivedEvent
+ branch = projectParent.getAsJsonObject().get("refName");
+ }
+ if (branch == null) {
+ // Known events of this type:
+ // RefReplicationDoneEvent, RefReplicatedEvent
+ branch = projectParent.getAsJsonObject().get("ref");
+ }
+
+ if (branch != null) {
+ return getBranch(project, branch);
+ }
+ }
+ }
+ return null;
+ }
+
+ protected static Branch.NameKey getBranch(JsonElement project, JsonElement branch) {
+ return getBranch(project.getAsString(), branch.getAsString());
+ }
+
+ protected static Branch.NameKey getBranch(String project, String branch) {
+ return new Branch.NameKey(new Project.NameKey(project), (branch.startsWith("refs/") ? "" : "refs/heads/") + branch);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
new file mode 100644
index 0000000..422f377
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/CoreListener.java
@@ -0,0 +1,52 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.server.events.ChangeEvent;
+import com.google.gson.Gson;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class CoreListener implements ChangeListener {
+ private static Logger log = LoggerFactory.getLogger(CoreListener.class);
+
+ protected static final Gson gson = new Gson();
+ protected final DynamicSet<StreamEventListener> listeners;
+ protected final EventStore store;
+
+ @Inject
+ protected CoreListener(EventStore store, DynamicSet<StreamEventListener> listeners) {
+ this.store = store;
+ this.listeners = listeners;
+ }
+
+ @Override
+ public void onChangeEvent(ChangeEvent event) {
+ try {
+ store.add(gson.toJson(event));
+ } catch (IOException e) {
+ log.error("Cannot add event to event store", e);
+ }
+ for (StreamEventListener l : listeners) {
+ l.onStreamEventUpdate();
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java
new file mode 100644
index 0000000..77e086d
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/EventStore.java
@@ -0,0 +1,33 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public interface EventStore {
+ UUID getUuid() throws IOException;
+
+ long getHead() throws IOException;
+
+ long getTail() throws IOException;
+
+ void add(String event) throws IOException;
+
+ /** returns null if event does not exist */
+ String get(long n) throws IOException;
+
+ void trim(long trim) throws IOException;
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java b/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java
new file mode 100644
index 0000000..e658f7c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/MemStore.java
@@ -0,0 +1,70 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.inject.Singleton;
+
+@Singleton
+public class MemStore implements EventStore {
+ protected final Map<Long, String> eventsByIndex = new ConcurrentHashMap<Long, String>();
+ protected final UUID uuid;
+
+ protected long head = 0;
+ protected long tail = 1;
+
+ public MemStore() {
+ uuid = UUID.randomUUID();
+ }
+
+ @Override
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public synchronized void add(String event) {
+ eventsByIndex.put(head + 1, event);
+ head++;
+ }
+
+ @Override
+ public String get(long n) {
+ return eventsByIndex.get(n);
+ }
+
+ @Override
+ public long getHead() {
+ return head;
+ }
+
+ @Override
+ public long getTail() {
+ return head == 0 ? 0 : tail;
+ }
+
+ @Override
+ public void trim(long trim) {
+ if (trim >= head) {
+ trim = head - 1;
+ }
+ for (long i = tail; i <= trim; i++) {
+ tail++;
+ eventsByIndex.remove(i);
+ }
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/Module.java b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
new file mode 100644
index 0000000..3c9d6cf
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/Module.java
@@ -0,0 +1,28 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.ChangeListener;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.inject.AbstractModule;
+
+public class Module extends AbstractModule {
+ @Override
+ protected void configure() {
+ DynamicSet.setOf(binder(), StreamEventListener.class);
+ bind(EventStore.class).to(MemStore.class);
+ DynamicSet.bind(binder(), ChangeListener.class).to(CoreListener.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java b/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java
new file mode 100644
index 0000000..7eb00c9
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/SshModule.java
@@ -0,0 +1,24 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.sshd.PluginCommandModule;
+
+public class SshModule extends PluginCommandModule {
+ @Override
+ protected void configureCommands() {
+ command(StreamEvents.class);
+ }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java
new file mode 100644
index 0000000..e54f164
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEventListener.java
@@ -0,0 +1,19 @@
+// Copyright (C) 2016 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+public interface StreamEventListener {
+ void onStreamEventUpdate();
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
new file mode 100644
index 0000000..fa2c70c
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/events/StreamEvents.java
@@ -0,0 +1,247 @@
+// Copyright (C) 2010 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.events;
+
+import com.google.gerrit.common.data.GlobalCapability;
+import com.google.gerrit.extensions.annotations.RequiresCapability;
+import com.google.gerrit.extensions.registration.DynamicSet;
+import com.google.gerrit.extensions.registration.RegistrationHandle;
+import com.google.gerrit.server.IdentifiedUser;
+import com.google.gerrit.server.git.WorkQueue;
+import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
+import com.google.gerrit.sshd.BaseCommand;
+import com.google.gerrit.sshd.CommandMetaData;
+import com.google.gerrit.sshd.StreamCommandExecutor;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.concurrent.Future;
+import org.apache.sshd.server.Environment;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RequiresCapability(GlobalCapability.STREAM_EVENTS)
+@CommandMetaData(name = "stream", descr = "Monitor events occurring in real time")
+public final class StreamEvents extends BaseCommand {
+ private static final Logger log = LoggerFactory.getLogger(StreamEvents.class);
+
+ protected static final int BATCH_SIZE = 32; // yield thread after
+ protected static final Gson gson = new Gson();
+ protected static final JsonParser parser = new JsonParser();
+
+ @Option(
+ name = "--resume-after",
+ metaVar = "RESUME_AFTER",
+ usage = "event id after which to resume playing events on connection"
+ )
+ protected void parseId(String arg) throws IOException {
+ resume = 0;
+ if (arg.equals("0")) {
+ return;
+ }
+
+ String[] ids = arg.split(":");
+ if (ids.length == 2) {
+ if (!ids[0].equals(events.getUuid().toString())) { // store has changed
+ return;
+ }
+
+ try {
+ resume = new Long(ids[1]);
+ return;
+ } catch (NumberFormatException e) { // fall through
+ }
+ }
+ throw new IllegalArgumentException("Invalid event Id: " + arg);
+ }
+
+ protected long resume = -1;
+
+ @Option(name = "--ids", usage = "add ids to events (useful for resuming after a disconnect)")
+ protected boolean includeIds = false;
+
+ @Inject @StreamCommandExecutor protected WorkQueue.Executor threadPool;
+
+ @Inject protected EventStore events;
+
+ @Inject protected DynamicSet<StreamEventListener> subscriptionListeners;
+
+ @Inject protected BranchHelper perms;
+
+ @Inject protected IdentifiedUser currentUser;
+
+ protected CancelableRunnable flusherRunnable;
+ protected RegistrationHandle subscription;
+
+ protected final Object crossThreadlock = new Object();
+ protected Future<?> flusherTask;
+ protected PrintWriter stdout;
+
+ protected long sent;
+ protected volatile boolean shuttingDown = false;
+
+ @Override
+ public void start(final Environment env) throws IOException {
+ try {
+ parseCommandLine();
+ } catch (UnloggedFailure e) {
+ String msg = e.getMessage();
+ if (!msg.endsWith("\n")) {
+ msg += "\n";
+ }
+ err.write(msg.getBytes("UTF-8"));
+ err.flush();
+ onExit(1);
+ return;
+ }
+ stdout = toPrintWriter(out);
+
+ initSent();
+ flusherRunnable = createFlusherRunnable();
+ subscribe();
+ startFlush();
+ }
+
+ protected CancelableRunnable createFlusherRunnable() {
+ return new CancelableRunnable() {
+ @Override
+ public void run() {
+ try {
+ flushBatch();
+ } catch (IOException e) {
+ log.error("Error Flushing Stream Events", e);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ onExit(0);
+ }
+ };
+ }
+
+ protected void initSent() throws IOException {
+ long head = events.getHead();
+ long tail = events.getTail();
+ if (resume == -1 || resume > head) {
+ sent = head;
+ } else {
+ sent = resume;
+ }
+ if (sent < tail) {
+ sent = tail - 1;
+ }
+ }
+
+ protected void startFlush() throws IOException {
+ synchronized (crossThreadlock) {
+ if (flusherTask == null && !shuttingDown) {
+ if (sent < events.getHead()) {
+ flusherTask = threadPool.submit(flusherRunnable);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void onExit(final int rc) {
+ unsubscribe();
+ synchronized (crossThreadlock) {
+ shuttingDown = true;
+ }
+ super.onExit(rc);
+ }
+
+ @Override
+ public void destroy() {
+ unsubscribe();
+ synchronized (crossThreadlock) {
+ boolean alreadyShuttingDown = shuttingDown;
+ shuttingDown = true;
+ if (flusherTask != null) {
+ flusherTask.cancel(true);
+ } else if (!alreadyShuttingDown) {
+ onExit(0);
+ }
+ }
+ }
+
+ protected void subscribe() {
+ subscription =
+ subscriptionListeners.add(
+ new StreamEventListener() {
+ @Override
+ public void onStreamEventUpdate() {
+ try {
+ startFlush();
+ } catch (IOException e) {
+ log.error("Error starting to flushing Stream Events", e);
+ }
+ }
+ });
+ }
+
+ protected void unsubscribe() {
+ if (subscription != null) {
+ subscription.remove();
+ subscription = null;
+ }
+ }
+
+ protected void flushBatch() throws IOException {
+ String uuid = events.getUuid().toString();
+ int processed = 0;
+ long head = events.getHead();
+ while (sent < head && processed < BATCH_SIZE) {
+ long sending = sent + 1;
+ String event = events.get(sending);
+ if (Thread.interrupted() || stdout.checkError()) {
+ onExit(0);
+ return;
+ }
+ flush(uuid, sending, event);
+ sent = sending;
+ processed++;
+ }
+ synchronized (crossThreadlock) {
+ flusherTask = null;
+ }
+ startFlush();
+ }
+
+ protected void flush(String uuid, long number, String json) {
+ if (json != null) {
+ JsonElement el = parser.parse(json);
+ if (perms.isVisibleTo(el, currentUser)) {
+ if (includeIds) {
+ el.getAsJsonObject().addProperty("id", uuid + ":" + number);
+ json = gson.toJson(el);
+ }
+ flush(json + "\n");
+ }
+ }
+ }
+
+ protected void flush(String msg) {
+ synchronized (stdout) {
+ stdout.print(msg);
+ stdout.flush();
+ }
+ }
+}
diff --git a/src/main/resources/Documentation/cmd-stream.md b/src/main/resources/Documentation/cmd-stream.md
new file mode 100644
index 0000000..2e9b7ad
--- /dev/null
+++ b/src/main/resources/Documentation/cmd-stream.md
@@ -0,0 +1,280 @@
+@PLUGIN@ stream
+===============
+
+NAME
+----
+@PLUGIN@ stream - Monitor events occurring in real time
+
+SYNOPSIS
+--------
+```
+ssh -p @SSH_PORT@ @SSH_HOST@ @PLUGIN@ stream
+ [--ids]
+ [--resume-after <RESUME_AFTER>]
+```
+
+DESCRIPTION
+-----------
+
+Provides a portal into the major events occurring on the server,
+outputing activity data in real-time to the client. Events are
+filtered by the caller's access permissions, ensuring the caller
+only receives events for changes they can view on the web, or in
+the project repository.
+
+It is possible to make the events numbered so that clients may
+request the server to send all the known (and visible) events
+starting after a specific event using its id. This makes it
+possible to retrieve the events which occured when the client
+was disconnected.
+
+Event output is in JSON, one event per line.
+
+OPTIONS
+-----------
+**--ids**
+
+: add ids to events, useful for resuming after a disconnect
+(see --resume-after).
+
+**--resume-after**
+
+: event id after which to resume playing events on connection.
+
+
+ACCESS
+------
+Any user who has configured an SSH key.
+
+SCRIPTING
+---------
+This command is intended to be used in scripts.
+
+EXAMPLES
+--------
+
+```
+ $ ssh -p 29418 review.example.com @PLUGIN@ stream-events --ids
+ {"type":"comment-added", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:1"}
+ {"type":"change-merged", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:2"}
+
+```
+
+```
+ $ ssh -p 29418 review.example.com @PLUGIN@ stream-events --ids \
+ --resume-after bdedff7d-34fd-4459-a6a7-3f738f32c01d:1
+ {"type":"change-merged", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:2"}
+ {"type":"change-abandoned", ..., "id":"bdedff7d-34fd-4459-a6a7-3f738f32c01d:3"}
+
+```
+
+SCHEMA
+------
+The JSON messages consist of nested objects referencing the *change*,
+*patchSet*, *account* involved, and other attributes as appropriate.
+The currently supported message types are *patchset-created*,
+*draft-published*, *change-abandoned*, *change-restored*,
+*change-merged*, *merge-failed*, *comment-added*, *ref-updated* and
+*reviewer-added*.
+
+Note that any field may be missing in the JSON messages, so consumers of
+this JSON stream should deal with that appropriately.
+
+### Events
+
+#### Patchset Created
+
+**type**
+
+: "patchset-created"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**uploader**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Draft Published
+
+**type**
+
+: "draft-published"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchset**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**uploader**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Change Abandoned
+
+**type**
+
+: "change-abandoned"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**abandoner**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+: Reason for abandoning the change.
+
+#### Change Restored
+
+**type**
+
+: "change-restored"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**restorer**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+: Reason for restoring the change.
+
+#### Change Merged
+
+**type**
+
+: "change-merged"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+#### Merge Failed
+
+**type**
+
+: "merge-failed"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**reason**
+
+: Reason that the merge failed.
+
+#### Comment Added
+
+**type**
+
+: "comment-added"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchSet**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+
+**author**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**approvals**
+
+: All [approval attributes](../../../Documentation/json.html#approval)
+
+**comment**
+
+: Comment text author had written
+
+#### Ref Updated
+
+**type**
+
+: "ref-updated"
+
+**submitter**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+**refUpdate**
+
+: [refUpdate attribute](../../../Documentation/json.html#refUpdate)
+
+#### Reviewer Added
+
+**type**
+
+: "reviewer-added"
+
+**change**
+
+: [change attribute](../../../Documentation/json.html#change)
+
+
+**patchset**
+
+: [patchSet attribute](../../../Documentation/json.html#patchSet)
+
+**reviewer**
+
+: [account attribute](../../../Documentation/json.html#account)
+
+
+SEE ALSO
+--------
+
+* [JSON Data Formats](../../../Documentation/json.html)
+* [Access Controls](../../../Documentation/access-control.html)
diff --git a/test/lib_result.sh b/test/lib_result.sh
new file mode 100755
index 0000000..e883b81
--- /dev/null
+++ b/test/lib_result.sh
@@ -0,0 +1,22 @@
+# ---- TEST RESULTS ----
+result() { # test [error_message]
+ local result=$?
+ if [ $result -eq 0 ] ; then
+ echo "PASSED - $1 test"
+ else
+ echo "*** FAILED *** - $1 test"
+ RESULT=$result
+ [ $# -gt 1 ] && echo "$2"
+ fi
+}
+
+# output must match expected to pass
+result_out() { # test expected output
+ local disp=$(echo "Expected Output:" ;\
+ echo " $2" ;\
+ echo "Actual Output:" ;\
+ echo " $3")
+
+ [ "$2" = "$3" ]
+ result "$1" "$disp"
+}
diff --git a/test/test_events_plugin.sh b/test/test_events_plugin.sh
new file mode 100755
index 0000000..9af3d4a
--- /dev/null
+++ b/test/test_events_plugin.sh
@@ -0,0 +1,247 @@
+#!/bin/bash
+
+q() { "$@" > /dev/null 2>&1 ; } # cmd [args...] # quiet a command
+gssh() { ssh -p 29418 -x "$SERVER" "$@" 2>&1 ; } # run a gerrit ssh command
+mygit() { git --work-tree="$REPO_DIR" --git-dir="$GIT_DIR" "$@" ; } # [args...]
+
+cleanup() {
+ wait_event
+ (kill_captures ; sleep 1 ; kill_captures -9 ) &
+}
+
+# > uuid
+gen_uuid() { uuidgen | openssl dgst -sha1 -binary | xxd -p; }
+
+gen_commit_msg() { # msg > commit_msg
+ local msg=$1
+ echo "$msg
+
+Change-Id: I$(gen_uuid)"
+}
+
+get_change_num() { # < gerrit_push_response > changenum
+ local url=$(awk '/New Changes:/ { getline; print $2 }')
+ echo "${url##*\/}" | tr -d -c '[:digit:]'
+}
+
+create_change() { # [--dependent] [--draft] branch file [commit_message] > changenum
+ local opt_d opt_c opt_draft=false
+ [ "$1" = "--dependent" ] && { opt_d=$1 ; shift ; }
+ [ "$1" = "--draft" ] && { opt_draft=true ; shift ; }
+ local branch=$1 tmpfile=$2 msg=$3 out rtn
+ local content=$RANDOM dest=refs/for/$branch
+ "$opt_draft" && dest=refs/drafts/$branch
+
+ if [ -z "$opt_d" ] ; then
+ out=$(mygit fetch "$GITURL" "$branch" 2>&1) ||\
+ cleanup "Failed to fetch $branch: $out"
+ out=$(mygit checkout FETCH_HEAD 2>&1) ||\
+ cleanup "Failed to checkout $branch: $out"
+ fi
+
+ echo -e "$content" > "$tmpfile"
+
+ out=$(mygit add "$tmpfile" 2>&1) || cleanup "Failed to git add: $out"
+
+ [ -n "$msg" ] || msg=$(gen_commit_msg "Add $tmpfile")
+
+ out=$(mygit commit -m "$msg" 2>&1) ||\
+ cleanup "Failed to commit change: $out"
+ [ -n "$VERBOSE" ] && echo " commit:$out" >&2
+
+ out=$(mygit push "$GITURL" "HEAD:$dest" 2>&1) ||\
+ cleanup "Failed to push change: $out"
+ out=$(echo "$out" | get_change_num) ; rtn=$? ; echo "$out"
+ [ -n "$VERBOSE" ] && echo " change:$out" >&2
+ return $rtn
+}
+
+review() { gssh gerrit review "$@" ; }
+
+submit() { # change,ps
+ local out=$(review "$1" --submit)
+ local acl_err="one or more approvals failed; review output above"
+ local conflict_err="The change could not be merged due to a path conflict."
+
+ if echo "$out" | grep -q "$acl_err" ; then
+ if ! echo "$out" | grep -q "$conflict_err" ; then
+ echo "$out"
+ echo "User needs ACLs to approve and submit changes to $REF_BRANCH"
+ exit 1
+ fi
+ fi
+}
+
+# ------------------------- Event Capturing ---------------------------
+
+kill_captures() { # sig
+ local pid
+ for pid in "${CAPTURE_PIDS[@]}" ; do
+ q kill $1 $pid
+ done
+}
+
+setup_captures() {
+ ssh -p 29418 -x "$SERVER" "${CORE_CMD[@]}" > "$EVENTS_CORE" &
+ CAPTURE_PIDS=("${CAPTURE_PIDS[@]}" $!)
+ ssh -p 29418 -x "$SERVER" "${PLUGIN_CMD[@]}" > "$EVENTS_PLUGIN" &
+ CAPTURE_PIDS=("${CAPTURE_PIDS[@]}" $!)
+}
+
+capture_events() { # count
+ local count=$1
+ [ -n "$count" ] || count=1
+ ssh -p 29418 -x "$SERVER" "${PLUGIN_CMD[@]}" > "$EVENT_FIFO" &
+ CAPTURE_PID_SSH=$!
+ head -n $count < "$EVENT_FIFO" > "$EVENTS" &
+ CAPTURE_PID_HEAD=$!
+ sleep 1
+}
+
+wait_event() {
+ (sleep 1 ; q kill -9 $CAPTURE_PID_SSH ; q kill -9 $CAPTURE_PID_HEAD ) &
+ q wait $CAPTURE_PID_SSH $CAPTURE_PID_HEAD
+}
+
+get_event() { # number
+ local number=$1
+ [ -n "$number" ] || number=1
+
+ awk "NR==$number" "$EVENTS"
+}
+
+result_type() { # test type [n]
+ local test=$1 type=$2 number=$3
+ [ -n "$number" ] || number=1
+ wait_event
+ local event=$(get_event "$number")
+ echo "$event" | grep -q "\"type\":\"$type\""
+ result "$test $type" "$event"
+}
+
+# ------------------------- Usage ---------------------------
+
+usage() { # [error_message]
+ cat <<-EOF
+Usage: $MYPROG [-s|--server <server>] [-p|--project <project>]
+ [-r|--ref <ref branch>] [-g|--plugin <plugin>] [-h|--help]
+
+ -h|--help usage/help
+ -s|--server <server> server to use for the test (default: localhost)
+ -p|--project <project> git project to use (default: project0)
+ -r|--ref <ref branch> reference branch used to create branches (default: master)
+ --approvals <approvals> approvals needed for submit (default: --code-review 2)
+ --plugin-cmd <cmd> event streaming command for plugin (default: <plugin> stream)
+ --core-cmd <cmd> event streaming command for core (default: gerrit stream-events)
+EOF
+
+ [ -n "$1" ] && echo -e '\n'"ERROR: $1"
+ exit 1
+}
+
+parseArgs() {
+ SERVER="localhost"
+ PROJECT="tools/test/project0"
+ REF_BRANCH="master"
+ APPROVALS="--code-review 2"
+ CORE_CMD=(gerrit stream-events)
+ PLUGIN_CMD=(events stream)
+ while (( "$#" )) ; do
+ case "$1" in
+ --server|-s) shift; SERVER=$1 ;;
+ --project|-p) shift; PROJECT=$1 ;;
+ --ref|-r) shift; REF_BRANCH=$1 ;;
+ --approvals) shift; APPROVALS=$1 ;;
+ --plugin-cmd) shift; PLUGIN_CMD=($1) ;;
+ --core-cmd) shift; CORE_CMD=($1) ;;
+ --help|-h) usage ;;
+ --verbose|-v) VERBOSE=$1 ;;
+ *) usage "invalid argument '$1'" ;;
+ esac
+ shift
+ done
+
+ [ -n "$SERVER" ] || usage "server not set"
+ [ -n "$PROJECT" ] || usage "project not set"
+ [ -n "$REF_BRANCH" ] || usage "ref branch not set"
+}
+
+MYPROG=$(basename "$0")
+MYDIR=$(dirname "$0")
+
+source "$MYDIR/lib_result.sh"
+
+parseArgs "$@"
+
+TEST_DIR=$(readlink -f "$MYDIR/../target/test")
+rm -rf "$TEST_DIR"
+mkdir -p "$TEST_DIR"
+
+GITURL=ssh://$SERVER:29418/$PROJECT
+DEST_REF=$REF_BRANCH
+echo "$REF_BRANCH" | grep -q '^refs/' || DEST_REF=refs/heads/$REF_BRANCH
+git ls-remote "$GITURL" | grep -q "$DEST_REF" || usage "invalid project/server/ref"
+
+REPO_DIR=$TEST_DIR/repo
+q git init "$REPO_DIR"
+GIT_DIR="$REPO_DIR/.git"
+FILE_A="$REPO_DIR/fileA"
+
+EVENTS_CORE=$TEST_DIR/events-core
+EVENTS_PLUGIN=$TEST_DIR/events-plugin
+EVENT_FIFO=$TEST_DIR/event-fifo
+EVENTS=$TEST_DIR/events
+mkfifo "$EVENT_FIFO"
+
+trap cleanup EXIT
+
+setup_captures
+
+# ------------------------- Individual Event Tests ---------------------------
+GROUP=visible-events
+type=patchset-created
+capture_events
+ch1=$(create_change --draft "$REF_BRANCH" "$FILE_A") || exit
+result_type "$GROUP" "$type"
+
+type=draft-published
+capture_events
+review "$ch1,1" --publish
+result_type "$GROUP" "$type"
+
+type=change-abandoned
+capture_events
+review "$ch1,1" --abandon
+result_type "$GROUP" "$type"
+
+type=change-restored
+capture_events
+review "$ch1,1" --restore
+result_type "$GROUP" "$type"
+
+type=comment-added
+capture_events
+review "$ch1,1" --message "my_comment" $APPROVALS
+result_type "$GROUP" "$type"
+
+ch2=$(create_change "$REF_BRANCH" "$FILE_A") || exit
+review "$ch2,1" $APPROVALS
+
+type=change-merged
+capture_events 2
+submit "$ch1,1"
+result_type "$GROUP" "ref-updated"
+result_type "$GROUP" "$type" 2
+
+type=merge-failed
+capture_events
+submit "$ch2,1"
+result_type "$GROUP" "$type"
+
+# reviewer-added needs to be tested via Rest-API
+
+# ------------------------- Compare them all to Core -------------------------
+
+out=$(diff "$EVENTS_CORE" "$EVENTS_PLUGIN")
+result "core/plugin diff" "$out"
+