Add upsert to Access interface
Being able to update-or-insert a record makes it easier to avoid
using a Transaction in some cases. SQL doesn't do upsert well,
but non-SQL systems like Apache Cassandra can.
For SQL we try to fake it by updating first, and if that fails,
performing an insert. There is still a possible race condition,
a different thread could perform the insert between our update
failure and our insert attempt.
Change-Id: I384beda849cb815314530440e1bfdb502dadc95a
Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/src/main/java/com/google/gwtorm/client/Access.java b/src/main/java/com/google/gwtorm/client/Access.java
index 1d7ab84..1cf5331 100644
--- a/src/main/java/com/google/gwtorm/client/Access.java
+++ b/src/main/java/com/google/gwtorm/client/Access.java
@@ -138,6 +138,27 @@
void update(Iterable<T> instances, Transaction txn) throws OrmException;
/**
+ * Immediately update or insert entities in the data store.
+ *
+ * @param instances the instances to update. The iteration occurs only once.
+ * @throws OrmException data modification failed.
+ * @throws UnsupportedOperationException no PrimaryKey was declared.
+ */
+ void upsert(Iterable<T> instances) throws OrmException;
+
+ /**
+ * Update or insert entities in the data store.
+ *
+ * @param instances the instances to update. The iteration occurs only once.
+ * @param txn transaction to batch the operation into. If not null the data
+ * store changes will be delayed to {@link Transaction#commit()} is
+ * invoked; if null the operation occurs immediately.
+ * @throws OrmException data modification failed.
+ * @throws UnsupportedOperationException no PrimaryKey was declared.
+ */
+ void upsert(Iterable<T> instances, Transaction txn) throws OrmException;
+
+ /**
* Immediately delete existing entities from the data store.
*
* @param instances the instances to delete. The iteration occurs only once.
diff --git a/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java b/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
index 9103773..56e976b 100644
--- a/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
+++ b/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
@@ -77,6 +77,19 @@
}
}
+ public final void upsert(final Iterable<E> instances) throws OrmException {
+ doUpsert(instances, null);
+ }
+
+ public final void upsert(final Iterable<E> instances, final Transaction txn)
+ throws OrmException {
+ if (txn != null) {
+ cast(txn).queueUpsert(this, instances);
+ } else {
+ upsert(instances);
+ }
+ }
+
public final void delete(final Iterable<E> instances) throws OrmException {
doDelete(instances, null);
}
@@ -96,6 +109,9 @@
protected abstract void doUpdate(Iterable<E> instances, T txn)
throws OrmException;
+ protected abstract void doUpsert(Iterable<E> instances, T txn)
+ throws OrmException;
+
protected abstract void doDelete(Iterable<E> instances, T txn)
throws OrmException;
diff --git a/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java b/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
index a424f8f..def8385 100644
--- a/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
+++ b/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
@@ -30,11 +30,13 @@
protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingInsert;
protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingUpdate;
+ protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingUpsert;
protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingDelete;
protected AbstractTransaction() {
pendingInsert = newMap();
pendingUpdate = newMap();
+ pendingUpsert = newMap();
pendingDelete = newMap();
}
@@ -48,6 +50,9 @@
for (Action<?, Key<?>, AbstractTransaction> a : pendingUpdate.values()) {
a.doUpdate(this);
}
+ for (Action<?, Key<?>, AbstractTransaction> a : pendingUpsert.values()) {
+ a.doUpsert(this);
+ }
}
<E, K extends Key<?>, T extends AbstractTransaction> void queueInsert(
@@ -60,6 +65,11 @@
queue(pendingUpdate, access, list);
}
+ <E, K extends Key<?>, T extends AbstractTransaction> void queueUpsert(
+ final AbstractAccess<E, ?, T> access, final Iterable<E> list) {
+ queue(pendingUpsert, access, list);
+ }
+
<E, K extends Key<?>, T extends AbstractTransaction> void queueDelete(
final AbstractAccess<E, ?, T> access, final Iterable<E> list) {
queue(pendingDelete, access, list);
@@ -117,6 +127,10 @@
access.doUpdate(instances, t);
}
+ void doUpsert(final T t) throws OrmException {
+ access.doUpsert(instances, t);
+ }
+
void doDelete(final T t) throws OrmException {
access.doDelete(instances, t);
}
diff --git a/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java b/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
index aa5775f..0304a2e 100644
--- a/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
+++ b/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
@@ -197,6 +197,55 @@
}
@Override
+ protected void doUpsert(final Iterable<T> instances, final JdbcTransaction txn)
+ throws OrmException {
+ // Assume update first, it will cheaply tell us if the row is missing.
+ //
+ Collection<T> inserts = null;
+ try {
+ final PreparedStatement ps;
+
+ ps = schema.getConnection().prepareStatement(getUpdateOneSql());
+ try {
+ int cnt = 0;
+ for (final T o : instances) {
+ bindOneUpdate(ps, o);
+ ps.addBatch();
+ cnt++;
+ }
+
+ final int[] states = ps.executeBatch();
+ if (states == null) {
+ inserts = new ArrayList<T>(cnt);
+ for(T o : instances){
+ inserts.add(o);
+ }
+ } else {
+ int i = 0;
+ for (T o : instances) {
+ if (states.length <= i || states[i] != 1) {
+ if (inserts == null) {
+ inserts = new ArrayList<T>(cnt - i);
+ }
+ inserts.add(o);
+ }
+ i++;
+ }
+ }
+
+ } finally {
+ ps.close();
+ }
+ } catch (SQLException e) {
+ throw convertError("update", e);
+ }
+
+ if (inserts != null) {
+ doInsert(inserts, txn);
+ }
+ }
+
+ @Override
protected void doDelete(final Iterable<T> instances, final JdbcTransaction txn)
throws OrmException {
try {