Check for NoteDb migration when loading/testing multi-site

The multi-site plugin is designed to work in conjunction with changes
meta-data on NoteDb under the '/meta' ref. Make sure that Gerrit
changes are migrated when starting the plugin and before running
any integration test.

Relax also the requirement about the number of events received
for asserting only the ones that are relevant to the tests.
NoteDb may generate extra events that are not necessarily interesting
for the multi-site plugin and can be thus safely ignored.

Change-Id: Ia7b9db125e96fafe4eaba1518ce8007ae632d8b2
diff --git a/README.md b/README.md
index 89f9e78..345fefb 100644
--- a/README.md
+++ b/README.md
@@ -11,6 +11,9 @@
 - Connected to the same message broker
 - Accessible behind a load balancer (e.g., HAProxy)
 
+**NOTE**: The multi-site plugin will not start if Gerrit is not yet migrated
+to NoteDb.
+
 Currently, the only mode supported is one primary read/write master
 and multiple read-only masters but eventually the plan is to support N
 read/write masters. The read/write master is handling any traffic while the
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/GerritNoteDbStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/GerritNoteDbStatus.java
new file mode 100644
index 0000000..ff932da
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/GerritNoteDbStatus.java
@@ -0,0 +1,34 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.google.gerrit.server.notedb.NotesMigration;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+public class GerritNoteDbStatus implements NoteDbStatus {
+  private final NotesMigration notesMigration;
+
+  @Inject
+  public GerritNoteDbStatus(NotesMigration notesMigration) {
+    this.notesMigration = notesMigration;
+  }
+
+  @Override
+  public boolean enabled() {
+    return notesMigration.commitChangeWrites();
+  }
+}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
index 0469bf1..0d9ab52 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/Module.java
@@ -19,6 +19,7 @@
 import com.google.gson.Gson;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
+import com.google.inject.ProvisionException;
 import com.google.inject.Singleton;
 import com.googlesource.gerrit.plugins.multisite.broker.GsonProvider;
 import com.googlesource.gerrit.plugins.multisite.cache.CacheModule;
@@ -42,14 +43,21 @@
 public class Module extends LifecycleModule {
   private static final Logger log = LoggerFactory.getLogger(Module.class);
   private final Configuration config;
+  private final NoteDbStatus noteDb;
 
   @Inject
-  public Module(Configuration config) {
+  public Module(Configuration config, NoteDbStatus noteDb) {
     this.config = config;
+    this.noteDb = noteDb;
   }
 
   @Override
   protected void configure() {
+    if (!noteDb.enabled()) {
+      throw new ProvisionException(
+          "Gerrit is still running on ReviewDb: please migrate to NoteDb "
+              + "and then reload the multi-site plugin.");
+    }
 
     listener().to(Log4jMessageLogger.class);
     bind(MessageLogger.class).to(Log4jMessageLogger.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/multisite/NoteDbStatus.java b/src/main/java/com/googlesource/gerrit/plugins/multisite/NoteDbStatus.java
new file mode 100644
index 0000000..f47e503
--- /dev/null
+++ b/src/main/java/com/googlesource/gerrit/plugins/multisite/NoteDbStatus.java
@@ -0,0 +1,29 @@
+// Copyright (C) 2019 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.googlesource.gerrit.plugins.multisite;
+
+import com.google.inject.ImplementedBy;
+
+/** Returns the status of changes migration. */
+@ImplementedBy(GerritNoteDbStatus.class)
+public interface NoteDbStatus {
+
+  /**
+   * Status of NoteDb migration.
+   *
+   * @return true if Gerrit has been migrated to NoteDb
+   */
+  boolean enabled();
+}
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
index cf64a92..030d670 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/ModuleTest.java
@@ -35,13 +35,15 @@
   @Mock(answer = Answers.RETURNS_DEEP_STUBS)
   private Configuration configMock;
 
+  @Mock private NoteDbStatus noteDb;
+
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private Module module;
 
   @Before
   public void setUp() {
-    module = new Module(configMock);
+    module = new Module(configMock, noteDb);
   }
 
   @Test
diff --git a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
index ea00251..fcedfaf 100644
--- a/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
+++ b/src/test/java/com/googlesource/gerrit/plugins/multisite/kafka/consumer/EventConsumerIT.java
@@ -25,15 +25,18 @@
 import com.google.gerrit.extensions.events.LifecycleListener;
 import com.google.gerrit.extensions.registration.DynamicSet;
 import com.google.gerrit.lifecycle.LifecycleModule;
+import com.google.inject.Inject;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.googlesource.gerrit.plugins.multisite.Configuration;
 import com.googlesource.gerrit.plugins.multisite.Module;
+import com.googlesource.gerrit.plugins.multisite.NoteDbStatus;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.eclipse.jgit.lib.Config;
+import org.junit.Before;
 import org.junit.Test;
 import org.testcontainers.containers.KafkaContainer;
 
@@ -44,7 +47,11 @@
     sysModule =
         "com.googlesource.gerrit.plugins.multisite.kafka.consumer.EventConsumerIT$KafkaTestContainerModule")
 public class EventConsumerIT extends LightweightPluginDaemonTest {
-  private static final int QUEUE_POLL_TIMEOUT_MSECS = 30000;
+  private static final int QUEUE_POLL_TIMEOUT_MSECS = 10000;
+
+  static {
+    System.setProperty("gerrit.notedb", "ON");
+  }
 
   public static class KafkaTestContainerModule extends LifecycleModule {
 
@@ -66,6 +73,13 @@
       }
     }
 
+    private final NoteDbStatus noteDb;
+
+    @Inject
+    public KafkaTestContainerModule(NoteDbStatus noteDb) {
+      this.noteDb = noteDb;
+    }
+
     @Override
     protected void configure() {
       final KafkaContainer kafka = new KafkaContainer();
@@ -77,20 +91,29 @@
       config.setBoolean("kafka", "subscriber", "enabled", true);
       Configuration multiSiteConfig = new Configuration(config);
       bind(Configuration.class).toInstance(multiSiteConfig);
-      install(new Module(multiSiteConfig));
+      install(new Module(multiSiteConfig, noteDb));
 
       listener().toInstance(new KafkaStopAtShutdown(kafka));
     }
   }
 
+  @Override
+  @Before
+  public void setUpTestPlugin() throws Exception {
+    super.setUpTestPlugin();
+
+    if (!notesMigration.commitChangeWrites()) {
+      throw new IllegalStateException("NoteDb is mandatory for running the multi-site plugin");
+    }
+  }
+
   @Test
   public void createChangeShouldPropagateChangeIndexAndRefUpdateStreamEvent() throws Exception {
     LinkedBlockingQueue<SourceAwareEventWrapper> droppedEventsQueue = captureDroppedEvents();
     drainQueue(droppedEventsQueue);
 
     createChange();
-    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 3);
-    assertThat(createdChangeEvents).hasSize(3);
+    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
 
     assertThat(createdChangeEvents).contains("change-index");
     assertThat(createdChangeEvents).contains("ref-updated");
@@ -107,9 +130,8 @@
     in.message = "LGTM";
     gApi.changes().id(r.getChangeId()).revision("current").review(in);
 
-    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue, 2);
+    List<String> createdChangeEvents = receiveFromQueue(droppedEventsQueue);
 
-    assertThat(createdChangeEvents).hasSize(2);
     assertThat(createdChangeEvents).contains("change-index");
     assertThat(createdChangeEvents).contains("comment-added");
   }
@@ -133,15 +155,12 @@
     return droppedEvents;
   }
 
-  private List<String> receiveFromQueue(
-      LinkedBlockingQueue<SourceAwareEventWrapper> queue, int numEvents)
+  private List<String> receiveFromQueue(LinkedBlockingQueue<SourceAwareEventWrapper> queue)
       throws InterruptedException {
     List<String> eventsList = new ArrayList<>();
-    for (int i = 0; i < numEvents; i++) {
-      SourceAwareEventWrapper event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS);
-      if (event != null) {
-        eventsList.add(event.getHeader().getEventType());
-      }
+    SourceAwareEventWrapper event;
+    while ((event = queue.poll(QUEUE_POLL_TIMEOUT_MSECS, TimeUnit.MILLISECONDS)) != null) {
+      eventsList.add(event.getHeader().getEventType());
     }
     return eventsList;
   }