Use batch inserts instead of storing events in individual transactions
Change-Id: Ib1531e98527223c27b3a8e71a6f0acba317248c3
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
index 6885d44..d2c9557 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLClient.java
@@ -36,6 +36,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -43,20 +44,80 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
class SQLClient {
+ static final int MAX_BATCH_SIZE = 100;
+ static final int QUEUE_CAPACITY = 10000;
+
private static final FluentLogger log = FluentLogger.forEnclosingClass();
private final Gson gson;
private final SQLDialect databaseDialect;
+ private final BlockingQueue<ProjectEvent> eventQueue;
+ private final ScheduledExecutorService scheduler;
private HikariDataSource ds;
public SQLClient(HikariConfig config) {
ds = new HikariDataSource(config);
-
+ eventQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
gson = new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create();
-
databaseDialect = SQLDialect.fromJdbcUrl(config.getJdbcUrl());
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ scheduler.scheduleAtFixedRate(this::flush, 2, 2, TimeUnit.SECONDS);
+ }
+
+ void flush() {
+ List<ProjectEvent> batch = new ArrayList<>();
+ eventQueue.drainTo(batch, MAX_BATCH_SIZE);
+
+ if (batch.isEmpty()) {
+ return;
+ }
+
+ try {
+ batchInsert(batch);
+ } catch (SQLException e) {
+ log.atSevere().withCause(e).log("Failed to batch insert events");
+ }
+ }
+
+ private void batchInsert(List<ProjectEvent> events) throws SQLException {
+ String sql =
+ "INSERT INTO "
+ + TABLE_NAME
+ + " ("
+ + PROJECT_ENTRY
+ + ", "
+ + DATE_ENTRY
+ + ", "
+ + EVENT_ENTRY
+ + ") VALUES (?, ?, ?)";
+
+ try (Connection conn = ds.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+
+ for (ProjectEvent e : events) {
+ String projectName = e.getProjectNameKey().get();
+ Instant ts = Instant.ofEpochSecond(e.eventCreatedOn);
+ String eventJson = gson.toJson(e);
+
+ if (databaseDialect == SQLDialect.SPANNER && eventJson != null) {
+ eventJson = eventJson.replace("\\n", "\\\\n");
+ }
+
+ ps.setString(1, projectName);
+ ps.setTimestamp(2, Timestamp.from(ts));
+ ps.setString(3, eventJson);
+ ps.addBatch();
+ }
+
+ ps.executeBatch();
+ }
}
/**
@@ -90,6 +151,8 @@
}
void close() {
+ flush();
+ scheduler.shutdownNow();
ds.close();
}
@@ -111,39 +174,35 @@
}
/**
- * Store the event in the database.
+ * Queue the event in memory for processing.
*
- * @param event The event to store
- * @throws SQLException If there was a problem with the database
+ * @throws EventsLogException If there was a problem queueing the event
+ * @param event the event to store
*/
- void storeEvent(ProjectEvent event) throws SQLException {
- storeEvent(
- event.getProjectNameKey().get(),
- Instant.ofEpochSecond(event.eventCreatedOn),
- gson.toJson(event));
+ void storeEvent(ProjectEvent event) throws EventsLogException {
+ if (!eventQueue.offer(event)) {
+ throw new EventsLogException(String.format("Cannot offer event %s", gson.toJson(event)));
+ }
}
- /**
- * Store the event in the database.
- *
- * @param projectName The project in which this event happened
- * @param timestamp The instant at which this event took place
- * @param event The event as a string
- * @throws SQLException If there was a problem with the database
- */
- void storeEvent(String projectName, Instant timestamp, String event) throws SQLException {
- switch (databaseDialect) {
- case SPANNER:
- if (event != null) {
- event = event.replace("\\n", "\\\\n");
- }
- break;
- default:
+ void storeEvent(String projectName, Instant timestamp, String eventJson) throws SQLException {
+ String sql =
+ "INSERT INTO "
+ + TABLE_NAME
+ + " ("
+ + PROJECT_ENTRY
+ + ", "
+ + DATE_ENTRY
+ + ", "
+ + EVENT_ENTRY
+ + ") VALUES (?, ?, ?)";
+ try (Connection conn = ds.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ ps.setString(1, projectName);
+ ps.setTimestamp(2, Timestamp.from(timestamp));
+ ps.setString(3, eventJson);
+ ps.executeUpdate();
}
- String values = format("VALUES('%s', '%s', '%s')", projectName, timestamp, event);
- execute(
- format("INSERT INTO %s(%s, %s, %s) ", TABLE_NAME, PROJECT_ENTRY, DATE_ENTRY, EVENT_ENTRY)
- + values);
}
/**
diff --git a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
index 71144f0..bfbe6a8 100644
--- a/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
+++ b/src/main/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStore.java
@@ -143,37 +143,50 @@
if (projectName == null) {
return;
}
+ try {
+ getEventsDb().storeEvent(event);
+ } catch (EventsLogException e) {
+ log.atWarning().withCause(e).log("Cannot queue event");
+ }
+ }
+
+ /** Triggers flushes with retry support. */
+ public void flush() {
int failedConnections = 0;
boolean done = false;
+
while (!done) {
- done = true;
try {
- getEventsDb().storeEvent(event);
- } catch (SQLException e) {
- log.atWarning().withCause(e).log("Cannot store ChangeEvent for: %s}", projectName.get());
- if (e.getCause() instanceof ConnectException
+ getEventsDb().flush();
+ done = true;
+ } catch (Exception e) {
+ if ((e.getCause() instanceof ConnectException)
|| e.getMessage().contains("terminating connection")) {
- done = false;
+ failedConnections++;
try {
retryIfAllowed(failedConnections);
} catch (InterruptedException e1) {
- log.atWarning().log("Cannot store ChangeEvent for %s: Interrupted", projectName.get());
Thread.currentThread().interrupt();
return;
}
- failedConnections++;
+ } else {
+ log.atWarning().withCause(e).log("Cannot flush batched events");
+ return;
}
}
}
}
private void retryIfAllowed(int failedConnections) throws InterruptedException {
- if (failedConnections < maxTries - 1) {
- log.atInfo().log("Retrying store event");
- Thread.sleep(waitTime);
+ if (failedConnections < maxTries) {
+ log.atInfo().log("Retrying store events, attempt %d/%d", failedConnections, maxTries);
+ if (waitTime > 0) {
+ Thread.sleep(waitTime);
+ }
} else {
- log.atSevere().log("Failed to store event %d times", maxTries);
+ log.atSevere().log("Failed to store events %d times", maxTries);
setOnline(false);
+ throw new InterruptedException("Max retries exceeded");
}
}
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
similarity index 91%
rename from src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java
rename to src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
index 679383b..51b7f8e 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/EventsLogIT.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/EventsLogIT.java
@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package com.ericsson.gerrit.plugins.eventslog;
+package com.ericsson.gerrit.plugins.eventslog.sql;
import static com.google.common.truth.Truth.assertThat;
-import com.google.gerrit.acceptance.config.GerritConfig;
import com.google.gerrit.acceptance.LightweightPluginDaemonTest;
import com.google.gerrit.acceptance.TestPlugin;
+import com.google.gerrit.acceptance.config.GerritConfig;
import org.junit.Test;
@TestPlugin(
@@ -30,14 +30,16 @@
@Test
@GerritConfig(name = "plugin.events-log.storeUrl", value = "jdbc:h2:mem:db")
public void getEventsShallBeConsistent() throws Exception {
+ SQLStore store = plugin.getSysInjector().getInstance(SQLStore.class);
String events = "/plugins/events-log/events/?t1=1970-01-01;t2=2999-01-01";
String change1 = "refs/changes/01/1/1";
-
createChange();
+ store.flush();
String response = adminRestSession.get(events).getEntityContent();
assertThat(response).contains(change1);
createChange();
+ store.flush();
response = adminRestSession.get(events).getEntityContent();
assertThat(response).contains(change1);
assertThat(response).contains("refs/changes/02/2/1");
diff --git a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
index 963c0ae..6662848 100644
--- a/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
+++ b/src/test/java/com/ericsson/gerrit/plugins/eventslog/sql/SQLStoreTest.java
@@ -17,6 +17,7 @@
import static com.ericsson.gerrit.plugins.eventslog.sql.SQLTable.TABLE_NAME;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -25,6 +26,7 @@
import static org.mockito.Mockito.when;
import com.ericsson.gerrit.plugins.eventslog.EventsLogConfig;
+import com.ericsson.gerrit.plugins.eventslog.EventsLogException;
import com.ericsson.gerrit.plugins.eventslog.MalformedQueryException;
import com.ericsson.gerrit.plugins.eventslog.ServiceUnavailableException;
import com.google.common.collect.ImmutableList;
@@ -42,7 +44,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -103,7 +104,9 @@
@After
public void tearDown() throws Exception {
stat.execute("DROP TABLE IF EXISTS " + TABLE_NAME);
- store.stop();
+ if (store != null) {
+ store.stop();
+ }
}
@Test
@@ -112,7 +115,7 @@
when(withUserMock.project(any(Project.NameKey.class))).thenReturn(forProjectMock);
doNothing().when(forProjectMock).check(ProjectPermission.ACCESS);
setUpClient();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
List<String> events = store.queryChangeEvents(GENERIC_QUERY);
String json = new Gson().toJson(mockEvent);
assertThat(events).containsExactly(json).inOrder();
@@ -124,7 +127,6 @@
config.setJdbcUrl(TEST_LOCAL_URL);
localEventsDb = new SQLClient(config);
localEventsDb.createDBIfNotCreated();
- localEventsDb.storeEvent(mockEvent);
store =
new SQLStore(
cfgMock,
@@ -134,11 +136,11 @@
permissionBackendMock,
logCleanerMock,
PLUGIN_NAME);
-
store.start();
- ArgumentCaptor<Instant> captor = ArgumentCaptor.forClass(Instant.class);
- verify(eventsDb).storeEvent(any(String.class), captor.capture(), any(String.class));
- assertThat(captor.getValue()).isEqualTo(Instant.ofEpochSecond(mockEvent.eventCreatedOn));
+ storeThenFlush(store, mockEvent);
+ ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class);
+ verify(eventsDb).storeEvent(captor.capture());
+ assertThat(captor.getValue().eventCreatedOn).isEqualTo(mockEvent.eventCreatedOn);
}
@Test
@@ -149,7 +151,7 @@
.when(forProjectMock)
.check(ProjectPermission.ACCESS);
setUpClient();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
List<String> events = store.queryChangeEvents(GENERIC_QUERY);
assertThat(events).isEmpty();
}
@@ -169,19 +171,28 @@
.when(forProjectMock)
.check(ProjectPermission.ACCESS);
setUpClient();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
List<String> events = store.queryChangeEvents(GENERIC_QUERY);
assertThat(events).isEmpty();
}
@Test
public void retryOnConnectException() throws Exception {
- when(cfgMock.getMaxTries()).thenReturn(3);
- Throwable[] exceptions = new Throwable[3];
- Arrays.fill(exceptions, new SQLException(new ConnectException()));
setUpClientMock();
- doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
- doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+ when(cfgMock.getMaxTries()).thenReturn(3);
+ when(cfgMock.getWaitTime()).thenReturn(0);
+ doAnswer(
+ invocation -> {
+ throw new EventsLogException("Error", new ConnectException());
+ })
+ .doAnswer(
+ invocation -> {
+ throw new EventsLogException("Error", new ConnectException());
+ })
+ .doNothing()
+ .when(eventsDb)
+ .flush();
+
store =
new SQLStore(
cfgMock,
@@ -193,19 +204,27 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
- verify(eventsDb, times(3)).storeEvent(mockEvent);
- verify(localEventsDb).storeEvent(mockEvent);
+ storeThenFlush(store, new MockEvent("retryConnect"));
+ verify(eventsDb, times(3)).flush();
}
@Test
public void retryOnMessage() throws Exception {
- when(cfgMock.getMaxTries()).thenReturn(3);
- Throwable[] exceptions = new Throwable[3];
- Arrays.fill(exceptions, new SQLException(TERM_CONN_MSG));
setUpClientMock();
- doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
- doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+ mockEvent = new MockEvent("test");
+ when(cfgMock.getMaxTries()).thenReturn(3);
+ when(cfgMock.getWaitTime()).thenReturn(0);
+ doAnswer(
+ invocation -> {
+ throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG));
+ })
+ .doAnswer(
+ invocation -> {
+ throw new EventsLogException(TERM_CONN_MSG, new SQLException(TERM_CONN_MSG));
+ })
+ .doNothing()
+ .when(eventsDb)
+ .flush();
store =
new SQLStore(
@@ -218,16 +237,21 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
- verify(eventsDb, times(3)).storeEvent(mockEvent);
- verify(localEventsDb).storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
+ verify(eventsDb, times(3)).flush();
}
@Test
public void noRetryOnMessage() throws Exception {
when(cfgMock.getMaxTries()).thenReturn(3);
setUpClientMock();
- doThrow(new SQLException(MSG)).when(eventsDb).storeEvent(mockEvent);
+ doAnswer(
+ invocation -> {
+ throw new SQLException(MSG);
+ })
+ .doNothing()
+ .when(eventsDb)
+ .flush();
store =
new SQLStore(
@@ -240,18 +264,22 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
- verify(eventsDb, times(1)).storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
+ verify(eventsDb, times(1)).flush();
}
@Test
public void noRetryOnZeroMaxTries() throws Exception {
when(cfgMock.getMaxTries()).thenReturn(0);
- Throwable[] exceptions = new Throwable[3];
- Arrays.fill(exceptions, new SQLException(new ConnectException()));
setUpClientMock();
- doThrow(exceptions).doNothing().when(eventsDb).storeEvent(mockEvent);
- doThrow(exceptions).doNothing().when(eventsDb).queryOne();
+
+ doAnswer(
+ invocation -> {
+ throw new SQLException(TERM_CONN_MSG);
+ })
+ .doNothing()
+ .when(eventsDb)
+ .flush();
store =
new SQLStore(
@@ -264,8 +292,8 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
- verify(eventsDb, times(1)).storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
+ verify(eventsDb, times(1)).flush();
}
@Test(expected = ServiceUnavailableException.class)
@@ -285,7 +313,7 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
store.queryChangeEvents(GENERIC_QUERY);
}
@@ -313,9 +341,9 @@
PLUGIN_NAME);
localEventsDb.createDBIfNotCreated();
- localEventsDb.storeEvent(mockEvent);
- localEventsDb.storeEvent(mockEvent2);
store.start();
+ storeThenFlush(store, mockEvent);
+ storeThenFlush(store, mockEvent2);
List<String> events = store.queryChangeEvents(GENERIC_QUERY);
Gson gson = new Gson();
@@ -364,7 +392,7 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
verify(localEventsDb).storeEvent(mockEvent);
}
@@ -386,7 +414,7 @@
PLUGIN_NAME);
store.start();
- store.storeEvent(mockEvent);
+ storeThenFlush(store, mockEvent);
verify(localEventsDb).storeEvent(mockEvent);
}
@@ -407,12 +435,6 @@
store.start();
}
- private void setUpClientMock() throws SQLException {
- eventsDb = mock(SQLClient.class);
- localEventsDb = mock(SQLClient.class);
- when(localEventsDb.dbExists()).thenReturn(true);
- }
-
/**
* For this test we expect that if we can connect to main database, then we should come back
* online and try setting up again. We just want to make sure that restoreEventsFromLocal gets
@@ -424,7 +446,11 @@
eventsDb = new SQLClient(config);
localEventsDb = mock(SQLClient.class);
when(localEventsDb.dbExists()).thenReturn(true);
- when(localEventsDb.getAll()).thenReturn(ImmutableList.of(mock(SQLEntry.class)));
+ SQLEntry entry =
+ new SQLEntry(
+ "proj", Instant.now(), "{\"type\":\"mock event\"}", 123L // dummy primary key
+ );
+ when(localEventsDb.getAll()).thenReturn(ImmutableList.of(entry));
store =
new SQLStore(
@@ -457,7 +483,6 @@
config.setJdbcUrl(TEST_LOCAL_URL);
localEventsDb = new SQLClient(config);
localEventsDb.createDBIfNotCreated();
- localEventsDb.storeEvent(mockEvent);
doThrow(new SQLException(new ConnectException()))
.doNothing()
.when(eventsDb)
@@ -478,12 +503,26 @@
PLUGIN_NAME);
store.start();
+ storeThenFlush(store, mockEvent);
verify(eventsDb).queryOne();
- verify(eventsDb).storeEvent(any(String.class), any(Instant.class), any(String.class));
+ ArgumentCaptor<ProjectEvent> captor = ArgumentCaptor.forClass(ProjectEvent.class);
+ verify(eventsDb).storeEvent(captor.capture());
+ assertThat(captor.getValue().getProjectNameKey().get()).isEqualTo("mock project");
List<SQLEntry> entries = localEventsDb.getAll();
assertThat(entries).isEmpty();
}
+ private void setUpClientMock() throws SQLException {
+ eventsDb = mock(SQLClient.class);
+ localEventsDb = mock(SQLClient.class);
+ when(localEventsDb.dbExists()).thenReturn(true);
+ }
+
+ private void storeThenFlush(SQLStore store, MockEvent event) throws Exception {
+ store.storeEvent(event);
+ store.flush();
+ }
+
public class MockEvent extends ProjectEvent {
public String project = "mock project";