Merge "Merge branch 'stable-3.3' into stable-3.4" into stable-3.4
diff --git a/java/com/google/gerrit/acceptance/SshSession.java b/java/com/google/gerrit/acceptance/SshSession.java
index 054e523..23bfd0b 100644
--- a/java/com/google/gerrit/acceptance/SshSession.java
+++ b/java/com/google/gerrit/acceptance/SshSession.java
@@ -19,6 +19,7 @@
 
 import com.google.gerrit.acceptance.testsuite.account.TestAccount;
 import com.google.gerrit.acceptance.testsuite.account.TestSshKeys;
+import java.io.Reader;
 import java.net.InetSocketAddress;
 
 public abstract class SshSession {
@@ -41,6 +42,8 @@
 
   public abstract int execAndReturnStatus(String command) throws Exception;
 
+  public abstract Reader execAndReturnReader(String command) throws Exception;
+
   private boolean hasError() {
     return error != null;
   }
diff --git a/java/com/google/gerrit/acceptance/SshSessionJsch.java b/java/com/google/gerrit/acceptance/SshSessionJsch.java
index 86cc438..a86c2d6 100644
--- a/java/com/google/gerrit/acceptance/SshSessionJsch.java
+++ b/java/com/google/gerrit/acceptance/SshSessionJsch.java
@@ -25,8 +25,11 @@
 import com.jcraft.jsch.Session;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
 import java.io.StringWriter;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.security.KeyPair;
 import java.security.KeyPairGenerator;
@@ -130,6 +133,21 @@
     }
   }
 
+  @Override
+  public Reader execAndReturnReader(String command) throws Exception {
+    ChannelExec channel = (ChannelExec) getJschSession().openChannel("exec");
+    channel.setCommand(command);
+    channel.connect();
+
+    return new InputStreamReader(channel.getInputStream(), StandardCharsets.UTF_8) {
+      @Override
+      public void close() throws IOException {
+        super.close();
+        channel.disconnect();
+      }
+    };
+  }
+
   private Session getJschSession() throws Exception {
     if (session == null) {
       KeyPair keyPair = sshKeys.getKeyPair(account);
diff --git a/java/com/google/gerrit/acceptance/SshSessionMina.java b/java/com/google/gerrit/acceptance/SshSessionMina.java
index 4514f44..4d8691b 100644
--- a/java/com/google/gerrit/acceptance/SshSessionMina.java
+++ b/java/com/google/gerrit/acceptance/SshSessionMina.java
@@ -26,8 +26,11 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.Reader;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.security.InvalidAlgorithmParameterException;
 import java.security.KeyPairGenerator;
@@ -119,6 +122,12 @@
     }
   }
 
+  @Override
+  public Reader execAndReturnReader(String command) throws Exception {
+    return new InputStreamReader(
+        getMinaSession().exec(command, 0).getInputStream(), StandardCharsets.UTF_8);
+  }
+
   private SshdSession getMinaSession() throws Exception {
     if (session == null) {
       String username = getUsername();
diff --git a/javatests/com/google/gerrit/acceptance/ssh/StreamEventsIT.java b/javatests/com/google/gerrit/acceptance/ssh/StreamEventsIT.java
new file mode 100644
index 0000000..e0e1880
--- /dev/null
+++ b/javatests/com/google/gerrit/acceptance/ssh/StreamEventsIT.java
@@ -0,0 +1,86 @@
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gerrit.acceptance.ssh;
+
+import static com.google.gerrit.acceptance.WaitUtil.waitUntil;
+
+import com.google.common.base.Splitter;
+import com.google.gerrit.acceptance.AbstractDaemonTest;
+import com.google.gerrit.acceptance.NoHttpd;
+import com.google.gerrit.acceptance.Sandboxed;
+import com.google.gerrit.acceptance.UseSsh;
+import com.google.gerrit.extensions.api.changes.ChangeApi;
+import com.google.gerrit.extensions.api.changes.ReviewInput;
+import java.io.IOException;
+import java.io.Reader;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@NoHttpd
+@UseSsh
+@Sandboxed
+public class StreamEventsIT extends AbstractDaemonTest {
+  private static final Duration MAX_DURATION_FOR_RECEIVING_EVENTS = Duration.ofSeconds(2);
+  private static final String TEST_REVIEW_COMMENT = "any comment";
+  private StringBuilder eventsOutput = new StringBuilder();
+  private Reader streamEventsReader;
+
+  @Before
+  public void setup() throws Exception {
+    streamEventsReader = adminSshSession.execAndReturnReader("gerrit stream-events");
+  }
+
+  @After
+  public void closeStreamEvents() throws IOException {
+    streamEventsReader.close();
+  }
+
+  @Test
+  public void commentOnChangeShowsUpInStreamEvents() throws Exception {
+    reviewChange(new ReviewInput().message(TEST_REVIEW_COMMENT));
+    waitForEvent(() -> pollEventsContaining(TEST_REVIEW_COMMENT).size() == 1);
+  }
+
+  private void waitForEvent(Supplier<Boolean> waitCondition) throws InterruptedException {
+    waitUntil(() -> waitCondition.get(), MAX_DURATION_FOR_RECEIVING_EVENTS);
+  }
+
+  private void reviewChange(ReviewInput reviewInput) throws Exception {
+    ChangeApi changeApi = gApi.changes().id(createChange().getChange().getId().get());
+    changeApi.current().review(reviewInput);
+  }
+
+  private List<String> pollEventsContaining(String reviewComment) {
+    try {
+      char[] cbuf = new char[2048];
+      while (streamEventsReader.ready()) {
+        streamEventsReader.read(cbuf);
+        eventsOutput.append(cbuf);
+      }
+      return StreamSupport.stream(
+              Splitter.on('\n').trimResults().split(eventsOutput.toString()).spliterator(), false)
+          .filter(event -> event.contains(reviewComment))
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}