Use batch inserts instead of storing events in individual transactions

Change-Id: Ib1531e98527223c27b3a8e71a6f0acba317248c3
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
index 6885d44..d2c9557 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
@@ -36,6 +36,7 @@
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.HikariDataSource;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -43,20 +44,80 @@
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 class SQLClient {
+  static final int MAX_BATCH_SIZE = 100;
+  static final int QUEUE_CAPACITY = 10000;
+
   private static final FluentLogger log = FluentLogger.forEnclosingClass();
   private final Gson gson;
   private final SQLDialect databaseDialect;
+  private final BlockingQueue<ProjectEvent> eventQueue;
+  private final ScheduledExecutorService scheduler;
 
   private HikariDataSource ds;
 
   public SQLClient(HikariConfig config) {
     ds = new HikariDataSource(config);
-
+    eventQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
     gson = new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
-
     databaseDialect = SQLDialect.fromJdbcUrl(config.getJdbcUrl());
+    scheduler = Executors.newSingleThreadScheduledExecutor();
+    scheduler.scheduleAtFixedRate(this::flush, 2, 2, TimeUnit.SECONDS);
+  }
+
+  void flush() {
+    List<ProjectEvent> batch = new ArrayList<>();
+    eventQueue.drainTo(batch, MAX_BATCH_SIZE);
+
+    if (batch.isEmpty()) {
+      return;
+    }
+
+    try {
+      batchInsert(batch);
+    } catch (SQLException e) {
+      log.atSevere().withCause(e).log("Failed to batch insert events");
+    }
+  }
+
+  private void batchInsert(List<ProjectEvent> events) throws SQLException {
+    String sql =
+        "INSERT INTO "
+            + TABLE_NAME
+            + " ("
+            + PROJECT_ENTRY
+            + ", "
+            + DATE_ENTRY
+            + ", "
+            + EVENT_ENTRY
+            + ") VALUES (?, ?, ?)";
+
+    try (Connection conn = ds.getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+
+      for (ProjectEvent e : events) {
+        String projectName = e.getProjectNameKey().get();
+        Instant ts = Instant.ofEpochSecond(e.eventCreatedOn);
+        String eventJson = gson.toJson(e);
+
+        if (databaseDialect == SQLDialect.SPANNER && eventJson != null) {
+          eventJson = eventJson.replace("\\n", "\\\\n");
+        }
+
+        ps.setString(1, projectName);
+        ps.setTimestamp(2, Timestamp.from(ts));
+        ps.setString(3, eventJson);
+        ps.addBatch();
+      }
+
+      ps.executeBatch();
+    }
   }
 
   /**
@@ -90,6 +151,8 @@
   }
 
   void close() {
+    flush();
+    scheduler.shutdownNow();
     ds.close();
   }
 
@@ -111,39 +174,35 @@
   }
 
   /**
-   * Store the event in the database.
+   * Queue the event in memory for processing.
    *
-   * @param event The event to store
-   * @throws SQLException If there was a problem with the database
+   * @throws EventsLogException If there was a problem queueing the event
+   * @param event the event to store
    */
-  void storeEvent(ProjectEvent event) throws SQLException {
-    storeEvent(
-        event.getProjectNameKey().get(),
-        Instant.ofEpochSecond(event.eventCreatedOn),
-        gson.toJson(event));
+  void storeEvent(ProjectEvent event) throws EventsLogException {
+    if (!eventQueue.offer(event)) {
+      throw new EventsLogException(String.format("Cannot offer event %s", gson.toJson(event)));
+    }
   }
 
-  /**
-   * Store the event in the database.
-   *
-   * @param projectName The project in which this event happened
-   * @param timestamp The instant at which this event took place
-   * @param event The event as a string
-   * @throws SQLException If there was a problem with the database
-   */
-  void storeEvent(String projectName, Instant timestamp, String event) throws SQLException {
-    switch (databaseDialect) {
-      case SPANNER:
-        if (event != null) {
-          event = event.replace("\\n", "\\\\n");
-        }
-        break;
-      default:
+  void storeEvent(String projectName, Instant timestamp, String eventJson) throws SQLException {
+    String sql =
+        "INSERT INTO "
+            + TABLE_NAME
+            + " ("
+            + PROJECT_ENTRY
+            + ", "
+            + DATE_ENTRY
+            + ", "
+            + EVENT_ENTRY
+            + ") VALUES (?, ?, ?)";
+    try (Connection conn = ds.getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+      ps.setString(1, projectName);
+      ps.setTimestamp(2, Timestamp.from(timestamp));
+      ps.setString(3, eventJson);
+      ps.executeUpdate();
     }
-    String values = format("VALUES('%s', '%s', '%s')", projectName, timestamp, event);
-    execute(
-        format("INSERT INTO %s(%s, %s, %s) ", TABLE_NAME, PROJECT_ENTRY, DATE_ENTRY, EVENT_ENTRY)
-            + values);
   }
 
   /**
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
index 71144f0..bfbe6a8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
@@ -143,37 +143,50 @@
     if (projectName == null) {
       return;
     }
+    try {
+      getEventsDb().storeEvent(event);
+    } catch (EventsLogException e) {
+      log.atWarning().withCause(e).log("Cannot queue event");
+    }
+  }
+
+  /** Triggers flushes with retry support. */
+  public void flush() {
     int failedConnections = 0;
     boolean done = false;
+
     while (!done) {
-      done = true;
       try {
-        getEventsDb().storeEvent(event);
-      } catch (SQLException e) {
-        log.atWarning().withCause(e).log("Cannot store ChangeEvent for: %s}", projectName.get());
-        if (e.getCause() instanceof ConnectException
+        getEventsDb().flush();
+        done = true;
+      } catch (Exception e) {
+        if ((e.getCause() instanceof ConnectException)
             || e.getMessage().contains("terminating connection")) {
-          done = false;
+          failedConnections++;
           try {
             retryIfAllowed(failedConnections);
           } catch (InterruptedException e1) {
-            log.atWarning().log("Cannot store ChangeEvent for %s: Interrupted", projectName.get());
             Thread.currentThread().interrupt();
             return;
           }
-          failedConnections++;
+        } else {
+          log.atWarning().withCause(e).log("Cannot flush batched events");
+          return;
         }
       }
     }
   }
 
   private void retryIfAllowed(int failedConnections) throws InterruptedException {
-    if (failedConnections < maxTries - 1) {
-      log.atInfo().log("Retrying store event");
-      Thread.sleep(waitTime);
+    if (failedConnections < maxTries) {
+      log.atInfo().log("Retrying store events, attempt %d/%d", failedConnections, maxTries);
+      if (waitTime > 0) {
+        Thread.sleep(waitTime);
+      }
     } else {
-      log.atSevere().log("Failed to store event %d times", maxTries);
+      log.atSevere().log("Failed to store events %d times", maxTries);
       setOnline(false);
+      throw new InterruptedException("Max retries exceeded");
     }
   }
 
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
similarity index 91%
rename from src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java
rename to src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
index 679383b..51b7f8e 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
@@ -12,13 +12,13 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package com.ericsson.gerrit.plugins.eventslog;
+package com.ericsson.gerrit.plugins.eventslog.sql;
 
 import static com.google.common.truth.Truth.assertThat;
 
-import com.google.gerrit.acceptance.config.GerritConfig;
 import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
 import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
 import org.junit.Test;
 
 @TestPlugin(
@@ -30,14 +30,16 @@
   @Test
   @GerritConfig(name = "plugin.events-log.storeUrl", value = "jdbc:h2:mem:db")
   public void getEventsShallBeConsistent() throws Exception {
+    SQLStore store = plugin.getSysInjector().getInstance(SQLStore.class);
     String events = "/plugins/events-log/events/?t1=1970-01-01;t2=2999-01-01";
     String change1 = "refs/changes/01/1/1";
-
     createChange();
+    store.flush();
     String response = adminRestSession.get(events).getEntityContent();
     assertThat(response).contains(change1);
 
     createChange();
+    store.flush();
     response = adminRestSession.get(events).getEntityContent();
     assertThat(response).contains(change1);
     assertThat(response).contains("refs/changes/02/2/1");
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
index 963c0ae..6662848 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
@@ -17,6 +17,7 @@
 import static com.ericsson.gerrit.plugins.eventslog.sql.SQLTable.TABLE_NAME;
 import static com.google.common.truth.Truth.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -25,6 +26,7 @@
 import static org.mockito.Mockito.when;
 
 import com.ericsson.gerrit.plugins.eventslog.EventsLogConfig;
+import com.ericsson.gerrit.plugins.eventslog.EventsLogException;
 import com.ericsson.gerrit.plugins.eventslog.MalformedQueryException;
 import com.ericsson.gerrit.plugins.eventslog.ServiceUnavailableException;
 import com.google.common.collect.ImmutableList;
@@ -42,7 +44,6 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.time.Instant;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -103,7 +104,9 @@
   @After
   public void tearDown() throws Exception {
     stat.execute("DROP TABLE IF EXISTS " + TABLE_NAME);
-    store.stop();
+    if (store != null) {
+      store.stop();
+    }
   }
 
   @Test
@@ -112,7 +115,7 @@
     when(withUserMock.project(any(Project.NameKey.class))).thenReturn(forProjectMock);
     doNothing().when(forProjectMock).check(ProjectPermission.ACCESS);
     setUpClient();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     List<String> events = store.queryChangeEvents(GENERIC_QUERY);
     String json = new Gson().toJson(mockEvent);
     assertThat(events).containsExactly(json).inOrder();
@@ -124,7 +127,6 @@
     config.setJdbcUrl(TEST_LOCAL_URL);
     localEventsDb = new SQLClient(config);
     localEventsDb.createDBIfNotCreated();
-    localEventsDb.storeEvent(mockEvent);
     store =
         new SQLStore(
             cfgMock,
@@ -134,11 +136,11 @@
             permissionBackendMock,
             logCleanerMock,
             PLUGIN_NAME);
-
     store.start();
-    ArgumentCaptor<Instant> captor = ArgumentCaptor.forClass(Instant.class);
-    verify(eventsDb).storeEvent(any(String.class), captor.capture(), any(String.class));
-    assertThat(captor.getValue()).isEqualTo(Instant.ofEpochSecond(mockEvent.eventCreatedOn));
+    storeThenFlush(store, mockEvent);
+    ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class);
+    verify(eventsDb).storeEvent(captor.capture());
+    assertThat(captor.getValue().eventCreatedOn).isEqualTo(mockEvent.eventCreatedOn);
   }
 
   @Test
@@ -149,7 +151,7 @@
         .when(forProjectMock)
         .check(ProjectPermission.ACCESS);
     setUpClient();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     List<String> events = store.queryChangeEvents(GENERIC_QUERY);
     assertThat(events).isEmpty();
   }
@@ -169,19 +171,28 @@
         .when(forProjectMock)
         .check(ProjectPermission.ACCESS);
     setUpClient();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     List<String> events = store.queryChangeEvents(GENERIC_QUERY);
     assertThat(events).isEmpty();
   }
 
   @Test
   public void retryOnConnectException() throws Exception {
-    when(cfgMock.getMaxTries()).thenReturn(3);
-    Throwable[] exceptions = new Throwable[3];
-    Arrays.fill(exceptions, new SQLException(new ConnectException()));
     setUpClientMock();
-    doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
-    doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+    when(cfgMock.getMaxTries()).thenReturn(3);
+    when(cfgMock.getWaitTime()).thenReturn(0);
+    doAnswer(
+            invocation -> {
+              throw new EventsLogException("Error", new ConnectException());
+            })
+        .doAnswer(
+            invocation -> {
+              throw new EventsLogException("Error", new ConnectException());
+            })
+        .doNothing()
+        .when(eventsDb)
+        .flush();
+
     store =
         new SQLStore(
             cfgMock,
@@ -193,19 +204,27 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
-    verify(eventsDb, times(3)).storeEvent(mockEvent);
-    verify(localEventsDb).storeEvent(mockEvent);
+    storeThenFlush(store, new MockEvent("retryConnect"));
+    verify(eventsDb, times(3)).flush();
   }
 
   @Test
   public void retryOnMessage() throws Exception {
-    when(cfgMock.getMaxTries()).thenReturn(3);
-    Throwable[] exceptions = new Throwable[3];
-    Arrays.fill(exceptions, new SQLException(TERM_CONN_MSG));
     setUpClientMock();
-    doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
-    doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+    mockEvent = new MockEvent("test");
+    when(cfgMock.getMaxTries()).thenReturn(3);
+    when(cfgMock.getWaitTime()).thenReturn(0);
+    doAnswer(
+            invocation -> {
+              throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG));
+            })
+        .doAnswer(
+            invocation -> {
+              throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG));
+            })
+        .doNothing()
+        .when(eventsDb)
+        .flush();
 
     store =
         new SQLStore(
@@ -218,16 +237,21 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
-    verify(eventsDb, times(3)).storeEvent(mockEvent);
-    verify(localEventsDb).storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
+    verify(eventsDb, times(3)).flush();
   }
 
   @Test
   public void noRetryOnMessage() throws Exception {
     when(cfgMock.getMaxTries()).thenReturn(3);
     setUpClientMock();
-    doThrow(new SQLException(MSG)).when(eventsDb).storeEvent(mockEvent);
+    doAnswer(
+            invocation -> {
+              throw new SQLException(MSG);
+            })
+        .doNothing()
+        .when(eventsDb)
+        .flush();
 
     store =
         new SQLStore(
@@ -240,18 +264,22 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
-    verify(eventsDb, times(1)).storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
+    verify(eventsDb, times(1)).flush();
   }
 
   @Test
   public void noRetryOnZeroMaxTries() throws Exception {
     when(cfgMock.getMaxTries()).thenReturn(0);
-    Throwable[] exceptions = new Throwable[3];
-    Arrays.fill(exceptions, new SQLException(new ConnectException()));
     setUpClientMock();
-    doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
-    doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+
+    doAnswer(
+            invocation -> {
+              throw new SQLException(TERM_CONN_MSG);
+            })
+        .doNothing()
+        .when(eventsDb)
+        .flush();
 
     store =
         new SQLStore(
@@ -264,8 +292,8 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
-    verify(eventsDb, times(1)).storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
+    verify(eventsDb, times(1)).flush();
   }
 
   @Test(expected = ServiceUnavailableException.class)
@@ -285,7 +313,7 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     store.queryChangeEvents(GENERIC_QUERY);
   }
 
@@ -313,9 +341,9 @@
             PLUGIN_NAME);
 
     localEventsDb.createDBIfNotCreated();
-    localEventsDb.storeEvent(mockEvent);
-    localEventsDb.storeEvent(mockEvent2);
     store.start();
+    storeThenFlush(store, mockEvent);
+    storeThenFlush(store, mockEvent2);
 
     List<String> events = store.queryChangeEvents(GENERIC_QUERY);
     Gson gson = new Gson();
@@ -364,7 +392,7 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     verify(localEventsDb).storeEvent(mockEvent);
   }
 
@@ -386,7 +414,7 @@
             PLUGIN_NAME);
 
     store.start();
-    store.storeEvent(mockEvent);
+    storeThenFlush(store, mockEvent);
     verify(localEventsDb).storeEvent(mockEvent);
   }
 
@@ -407,12 +435,6 @@
     store.start();
   }
 
-  private void setUpClientMock() throws SQLException {
-    eventsDb = mock(SQLClient.class);
-    localEventsDb = mock(SQLClient.class);
-    when(localEventsDb.dbExists()).thenReturn(true);
-  }
-
   /**
    * For this test we expect that if we can connect to main database, then we should come back
    * online and try setting up again. We just want to make sure that restoreEventsFromLocal gets
@@ -424,7 +446,11 @@
     eventsDb = new SQLClient(config);
     localEventsDb = mock(SQLClient.class);
     when(localEventsDb.dbExists()).thenReturn(true);
-    when(localEventsDb.getAll()).thenReturn(ImmutableList.of(mock(SQLEntry.class)));
+    SQLEntry entry =
+        new SQLEntry(
+            "proj", Instant.now(), "{\"type\":\"mock event\"}", 123L // dummy primary key
+            );
+    when(localEventsDb.getAll()).thenReturn(ImmutableList.of(entry));
 
     store =
         new SQLStore(
@@ -457,7 +483,6 @@
     config.setJdbcUrl(TEST_LOCAL_URL);
     localEventsDb = new SQLClient(config);
     localEventsDb.createDBIfNotCreated();
-    localEventsDb.storeEvent(mockEvent);
     doThrow(new SQLException(new ConnectException()))
         .doNothing()
         .when(eventsDb)
@@ -478,12 +503,26 @@
             PLUGIN_NAME);
 
     store.start();
+    storeThenFlush(store, mockEvent);
     verify(eventsDb).queryOne();
-    verify(eventsDb).storeEvent(any(String.class), any(Instant.class), any(String.class));
+    ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class);
+    verify(eventsDb).storeEvent(captor.capture());
+    assertThat(captor.getValue().getProjectNameKey().get()).isEqualTo("mock project");
     List<SQLEntry> entries = localEventsDb.getAll();
     assertThat(entries).isEmpty();
   }
 
+  private void setUpClientMock() throws SQLException {
+    eventsDb = mock(SQLClient.class);
+    localEventsDb = mock(SQLClient.class);
+    when(localEventsDb.dbExists()).thenReturn(true);
+  }
+
+  private void storeThenFlush(SQLStore store, MockEvent event) throws Exception {
+    store.storeEvent(event);
+    store.flush();
+  }
+
   public class MockEvent extends ProjectEvent {
     public String project = "mock project";