Use batch inserts instead of storing events in individual transactions Change-Id: Ib1531e98527223c27b3a8e71a6f0acba317248c3
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java index 6885d44..d2c9557 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java +++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
@@ -36,6 +36,7 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -43,20 +44,80 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; class SQLClient { + static final int MAX_BATCH_SIZE = 100; + static final int QUEUE_CAPACITY = 10000; + private static final FluentLogger log = FluentLogger.forEnclosingClass(); private final Gson gson; private final SQLDialect databaseDialect; + private final BlockingQueue<ProjectEvent> eventQueue; + private final ScheduledExecutorService scheduler; private HikariDataSource ds; public SQLClient(HikariConfig config) { ds = new HikariDataSource(config); - + eventQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); gson = new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create(); - databaseDialect = SQLDialect.fromJdbcUrl(config.getJdbcUrl()); + scheduler = Executors.newSingleThreadScheduledExecutor(); + scheduler.scheduleAtFixedRate(this::flush, 2, 2, TimeUnit.SECONDS); + } + + void flush() { + List<ProjectEvent> batch = new ArrayList<>(); + eventQueue.drainTo(batch, MAX_BATCH_SIZE); + + if (batch.isEmpty()) { + return; + } + + try { + batchInsert(batch); + } catch (SQLException e) { + log.atSevere().withCause(e).log("Failed to batch insert events"); + } + } + + private void batchInsert(List<ProjectEvent> events) throws SQLException { + String sql = + "INSERT INTO " + + TABLE_NAME + + " (" + + PROJECT_ENTRY + + ", " + + DATE_ENTRY + + ", " + + EVENT_ENTRY + + ") VALUES (?, ?, ?)"; + + try (Connection conn = ds.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + for (ProjectEvent e : events) { + String projectName = e.getProjectNameKey().get(); + Instant ts = Instant.ofEpochSecond(e.eventCreatedOn); + String eventJson = gson.toJson(e); + + if (databaseDialect == SQLDialect.SPANNER && eventJson != null) { + eventJson = eventJson.replace("\\n", "\\\\n"); + } + + ps.setString(1, projectName); + ps.setTimestamp(2, Timestamp.from(ts)); + ps.setString(3, eventJson); + ps.addBatch(); + } + + ps.executeBatch(); + } } /** @@ -90,6 +151,8 @@ } void close() { + flush(); + scheduler.shutdownNow(); ds.close(); } @@ -111,39 +174,35 @@ } /** - * Store the event in the database. + * Queue the event in memory for processing. * - * @param event The event to store - * @throws SQLException If there was a problem with the database + * @throws EventsLogException If there was a problem queueing the event + * @param event the event to store */ - void storeEvent(ProjectEvent event) throws SQLException { - storeEvent( - event.getProjectNameKey().get(), - Instant.ofEpochSecond(event.eventCreatedOn), - gson.toJson(event)); + void storeEvent(ProjectEvent event) throws EventsLogException { + if (!eventQueue.offer(event)) { + throw new EventsLogException(String.format("Cannot offer event %s", gson.toJson(event))); + } } - /** - * Store the event in the database. - * - * @param projectName The project in which this event happened - * @param timestamp The instant at which this event took place - * @param event The event as a string - * @throws SQLException If there was a problem with the database - */ - void storeEvent(String projectName, Instant timestamp, String event) throws SQLException { - switch (databaseDialect) { - case SPANNER: - if (event != null) { - event = event.replace("\\n", "\\\\n"); - } - break; - default: + void storeEvent(String projectName, Instant timestamp, String eventJson) throws SQLException { + String sql = + "INSERT INTO " + + TABLE_NAME + + " (" + + PROJECT_ENTRY + + ", " + + DATE_ENTRY + + ", " + + EVENT_ENTRY + + ") VALUES (?, ?, ?)"; + try (Connection conn = ds.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setString(1, projectName); + ps.setTimestamp(2, Timestamp.from(timestamp)); + ps.setString(3, eventJson); + ps.executeUpdate(); } - String values = format("VALUES('%s', '%s', '%s')", projectName, timestamp, event); - execute( - format("INSERT INTO %s(%s, %s, %s) ", TABLE_NAME, PROJECT_ENTRY, DATE_ENTRY, EVENT_ENTRY) - + values); } /**
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java index 71144f0..bfbe6a8 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java +++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
@@ -143,37 +143,50 @@ if (projectName == null) { return; } + try { + getEventsDb().storeEvent(event); + } catch (EventsLogException e) { + log.atWarning().withCause(e).log("Cannot queue event"); + } + } + + /** Triggers flushes with retry support. */ + public void flush() { int failedConnections = 0; boolean done = false; + while (!done) { - done = true; try { - getEventsDb().storeEvent(event); - } catch (SQLException e) { - log.atWarning().withCause(e).log("Cannot store ChangeEvent for: %s}", projectName.get()); - if (e.getCause() instanceof ConnectException + getEventsDb().flush(); + done = true; + } catch (Exception e) { + if ((e.getCause() instanceof ConnectException) || e.getMessage().contains("terminating connection")) { - done = false; + failedConnections++; try { retryIfAllowed(failedConnections); } catch (InterruptedException e1) { - log.atWarning().log("Cannot store ChangeEvent for %s: Interrupted", projectName.get()); Thread.currentThread().interrupt(); return; } - failedConnections++; + } else { + log.atWarning().withCause(e).log("Cannot flush batched events"); + return; } } } } private void retryIfAllowed(int failedConnections) throws InterruptedException { - if (failedConnections < maxTries - 1) { - log.atInfo().log("Retrying store event"); - Thread.sleep(waitTime); + if (failedConnections < maxTries) { + log.atInfo().log("Retrying store events, attempt %d/%d", failedConnections, maxTries); + if (waitTime > 0) { + Thread.sleep(waitTime); + } } else { - log.atSevere().log("Failed to store event %d times", maxTries); + log.atSevere().log("Failed to store events %d times", maxTries); setOnline(false); + throw new InterruptedException("Max retries exceeded"); } }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java similarity index 91% rename from src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java rename to src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java index 679383b..51b7f8e 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java +++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
@@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.ericsson.gerrit.plugins.eventslog; +package com.ericsson.gerrit.plugins.eventslog.sql; import static com.google.common.truth.Truth.assertThat; -import com.google.gerrit.acceptance.config.GerritConfig; import com.google.gerrit.acceptance.LightweightPluginDaemonTest; import com.google.gerrit.acceptance.TestPlugin; +import com.google.gerrit.acceptance.config.GerritConfig; import org.junit.Test; @TestPlugin( @@ -30,14 +30,16 @@ @Test @GerritConfig(name = "plugin.events-log.storeUrl", value = "jdbc:h2:mem:db") public void getEventsShallBeConsistent() throws Exception { + SQLStore store = plugin.getSysInjector().getInstance(SQLStore.class); String events = "/plugins/events-log/events/?t1=1970-01-01;t2=2999-01-01"; String change1 = "refs/changes/01/1/1"; - createChange(); + store.flush(); String response = adminRestSession.get(events).getEntityContent(); assertThat(response).contains(change1); createChange(); + store.flush(); response = adminRestSession.get(events).getEntityContent(); assertThat(response).contains(change1); assertThat(response).contains("refs/changes/02/2/1");
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java index 963c0ae..6662848 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
@@ -17,6 +17,7 @@ import static com.ericsson.gerrit.plugins.eventslog.sql.SQLTable.TABLE_NAME; import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.eventslog.EventsLogConfig; +import com.ericsson.gerrit.plugins.eventslog.EventsLogException; import com.ericsson.gerrit.plugins.eventslog.MalformedQueryException; import com.ericsson.gerrit.plugins.eventslog.ServiceUnavailableException; import com.google.common.collect.ImmutableList; @@ -42,7 +44,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Instant; -import java.util.Arrays; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -103,7 +104,9 @@ @After public void tearDown() throws Exception { stat.execute("DROP TABLE IF EXISTS " + TABLE_NAME); - store.stop(); + if (store != null) { + store.stop(); + } } @Test @@ -112,7 +115,7 @@ when(withUserMock.project(any(Project.NameKey.class))).thenReturn(forProjectMock); doNothing().when(forProjectMock).check(ProjectPermission.ACCESS); setUpClient(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); List<String> events = store.queryChangeEvents(GENERIC_QUERY); String json = new Gson().toJson(mockEvent); assertThat(events).containsExactly(json).inOrder(); @@ -124,7 +127,6 @@ config.setJdbcUrl(TEST_LOCAL_URL); localEventsDb = new SQLClient(config); localEventsDb.createDBIfNotCreated(); - localEventsDb.storeEvent(mockEvent); store = new SQLStore( cfgMock, @@ -134,11 +136,11 @@ permissionBackendMock, logCleanerMock, PLUGIN_NAME); - store.start(); - ArgumentCaptor<Instant> captor = ArgumentCaptor.forClass(Instant.class); - verify(eventsDb).storeEvent(any(String.class), captor.capture(), any(String.class)); - assertThat(captor.getValue()).isEqualTo(Instant.ofEpochSecond(mockEvent.eventCreatedOn)); + storeThenFlush(store, mockEvent); + ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class); + verify(eventsDb).storeEvent(captor.capture()); + assertThat(captor.getValue().eventCreatedOn).isEqualTo(mockEvent.eventCreatedOn); } @Test @@ -149,7 +151,7 @@ .when(forProjectMock) .check(ProjectPermission.ACCESS); setUpClient(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); List<String> events = store.queryChangeEvents(GENERIC_QUERY); assertThat(events).isEmpty(); } @@ -169,19 +171,28 @@ .when(forProjectMock) .check(ProjectPermission.ACCESS); setUpClient(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); List<String> events = store.queryChangeEvents(GENERIC_QUERY); assertThat(events).isEmpty(); } @Test public void retryOnConnectException() throws Exception { - when(cfgMock.getMaxTries()).thenReturn(3); - Throwable[] exceptions = new Throwable[3]; - Arrays.fill(exceptions, new SQLException(new ConnectException())); setUpClientMock(); - doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent); - doThrow(exceptions).doNothing().when(eventsDb).queryOne(); + when(cfgMock.getMaxTries()).thenReturn(3); + when(cfgMock.getWaitTime()).thenReturn(0); + doAnswer( + invocation -> { + throw new EventsLogException("Error", new ConnectException()); + }) + .doAnswer( + invocation -> { + throw new EventsLogException("Error", new ConnectException()); + }) + .doNothing() + .when(eventsDb) + .flush(); + store = new SQLStore( cfgMock, @@ -193,19 +204,27 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); - verify(eventsDb, times(3)).storeEvent(mockEvent); - verify(localEventsDb).storeEvent(mockEvent); + storeThenFlush(store, new MockEvent("retryConnect")); + verify(eventsDb, times(3)).flush(); } @Test public void retryOnMessage() throws Exception { - when(cfgMock.getMaxTries()).thenReturn(3); - Throwable[] exceptions = new Throwable[3]; - Arrays.fill(exceptions, new SQLException(TERM_CONN_MSG)); setUpClientMock(); - doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent); - doThrow(exceptions).doNothing().when(eventsDb).queryOne(); + mockEvent = new MockEvent("test"); + when(cfgMock.getMaxTries()).thenReturn(3); + when(cfgMock.getWaitTime()).thenReturn(0); + doAnswer( + invocation -> { + throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG)); + }) + .doAnswer( + invocation -> { + throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG)); + }) + .doNothing() + .when(eventsDb) + .flush(); store = new SQLStore( @@ -218,16 +237,21 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); - verify(eventsDb, times(3)).storeEvent(mockEvent); - verify(localEventsDb).storeEvent(mockEvent); + storeThenFlush(store, mockEvent); + verify(eventsDb, times(3)).flush(); } @Test public void noRetryOnMessage() throws Exception { when(cfgMock.getMaxTries()).thenReturn(3); setUpClientMock(); - doThrow(new SQLException(MSG)).when(eventsDb).storeEvent(mockEvent); + doAnswer( + invocation -> { + throw new SQLException(MSG); + }) + .doNothing() + .when(eventsDb) + .flush(); store = new SQLStore( @@ -240,18 +264,22 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); - verify(eventsDb, times(1)).storeEvent(mockEvent); + storeThenFlush(store, mockEvent); + verify(eventsDb, times(1)).flush(); } @Test public void noRetryOnZeroMaxTries() throws Exception { when(cfgMock.getMaxTries()).thenReturn(0); - Throwable[] exceptions = new Throwable[3]; - Arrays.fill(exceptions, new SQLException(new ConnectException())); setUpClientMock(); - doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent); - doThrow(exceptions).doNothing().when(eventsDb).queryOne(); + + doAnswer( + invocation -> { + throw new SQLException(TERM_CONN_MSG); + }) + .doNothing() + .when(eventsDb) + .flush(); store = new SQLStore( @@ -264,8 +292,8 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); - verify(eventsDb, times(1)).storeEvent(mockEvent); + storeThenFlush(store, mockEvent); + verify(eventsDb, times(1)).flush(); } @Test(expected = ServiceUnavailableException.class) @@ -285,7 +313,7 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); store.queryChangeEvents(GENERIC_QUERY); } @@ -313,9 +341,9 @@ PLUGIN_NAME); localEventsDb.createDBIfNotCreated(); - localEventsDb.storeEvent(mockEvent); - localEventsDb.storeEvent(mockEvent2); store.start(); + storeThenFlush(store, mockEvent); + storeThenFlush(store, mockEvent2); List<String> events = store.queryChangeEvents(GENERIC_QUERY); Gson gson = new Gson(); @@ -364,7 +392,7 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); verify(localEventsDb).storeEvent(mockEvent); } @@ -386,7 +414,7 @@ PLUGIN_NAME); store.start(); - store.storeEvent(mockEvent); + storeThenFlush(store, mockEvent); verify(localEventsDb).storeEvent(mockEvent); } @@ -407,12 +435,6 @@ store.start(); } - private void setUpClientMock() throws SQLException { - eventsDb = mock(SQLClient.class); - localEventsDb = mock(SQLClient.class); - when(localEventsDb.dbExists()).thenReturn(true); - } - /** * For this test we expect that if we can connect to main database, then we should come back * online and try setting up again. We just want to make sure that restoreEventsFromLocal gets @@ -424,7 +446,11 @@ eventsDb = new SQLClient(config); localEventsDb = mock(SQLClient.class); when(localEventsDb.dbExists()).thenReturn(true); - when(localEventsDb.getAll()).thenReturn(ImmutableList.of(mock(SQLEntry.class))); + SQLEntry entry = + new SQLEntry( + "proj", Instant.now(), "{\"type\":\"mock event\"}", 123L // dummy primary key + ); + when(localEventsDb.getAll()).thenReturn(ImmutableList.of(entry)); store = new SQLStore( @@ -457,7 +483,6 @@ config.setJdbcUrl(TEST_LOCAL_URL); localEventsDb = new SQLClient(config); localEventsDb.createDBIfNotCreated(); - localEventsDb.storeEvent(mockEvent); doThrow(new SQLException(new ConnectException())) .doNothing() .when(eventsDb) @@ -478,12 +503,26 @@ PLUGIN_NAME); store.start(); + storeThenFlush(store, mockEvent); verify(eventsDb).queryOne(); - verify(eventsDb).storeEvent(any(String.class), any(Instant.class), any(String.class)); + ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class); + verify(eventsDb).storeEvent(captor.capture()); + assertThat(captor.getValue().getProjectNameKey().get()).isEqualTo("mock project"); List<SQLEntry> entries = localEventsDb.getAll(); assertThat(entries).isEmpty(); } + private void setUpClientMock() throws SQLException { + eventsDb = mock(SQLClient.class); + localEventsDb = mock(SQLClient.class); + when(localEventsDb.dbExists()).thenReturn(true); + } + + private void storeThenFlush(SQLStore store, MockEvent event) throws Exception { + store.storeEvent(event); + store.flush(); + } + public class MockEvent extends ProjectEvent { public String project = "mock project";