Add upsert to Access interface

Being able to update-or-insert a record makes it easier to avoid
using a Transaction in some cases.  SQL doesn't do upsert well,
but non-SQL systems like Apache Cassandra can.

For SQL we try to fake it by updating first, and if that fails,
performing an insert.  There is still a possible race condition,
a different thread could perform the insert between our update
failure and our insert attempt.

Change-Id: I384beda849cb815314530440e1bfdb502dadc95a
Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/src/main/java/com/google/gwtorm/client/Access.java b/src/main/java/com/google/gwtorm/client/Access.java
index 1d7ab84..1cf5331 100644
--- a/src/main/java/com/google/gwtorm/client/Access.java
+++ b/src/main/java/com/google/gwtorm/client/Access.java
@@ -138,6 +138,27 @@
   void update(Iterable<T> instances, Transaction txn) throws OrmException;
 
   /**
+   * Immediately update or insert entities in the data store.
+   *
+   * @param instances the instances to update. The iteration occurs only once.
+   * @throws OrmException data modification failed.
+   * @throws UnsupportedOperationException no PrimaryKey was declared.
+   */
+  void upsert(Iterable<T> instances) throws OrmException;
+
+  /**
+   * Update or insert entities in the data store.
+   *
+   * @param instances the instances to update. The iteration occurs only once.
+   * @param txn transaction to batch the operation into. If not null the data
+   *        store changes will be delayed to {@link Transaction#commit()} is
+   *        invoked; if null the operation occurs immediately.
+   * @throws OrmException data modification failed.
+   * @throws UnsupportedOperationException no PrimaryKey was declared.
+   */
+  void upsert(Iterable<T> instances, Transaction txn) throws OrmException;
+
+  /**
    * Immediately delete existing entities from the data store.
    *
    * @param instances the instances to delete. The iteration occurs only once.
diff --git a/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java b/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
index 9103773..56e976b 100644
--- a/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
+++ b/src/main/java/com/google/gwtorm/client/impl/AbstractAccess.java
@@ -77,6 +77,19 @@
     }
   }
 
+  public final void upsert(final Iterable<E> instances) throws OrmException {
+    doUpsert(instances, null);
+  }
+
+  public final void upsert(final Iterable<E> instances, final Transaction txn)
+      throws OrmException {
+    if (txn != null) {
+      cast(txn).queueUpsert(this, instances);
+    } else {
+      upsert(instances);
+    }
+  }
+
   public final void delete(final Iterable<E> instances) throws OrmException {
     doDelete(instances, null);
   }
@@ -96,6 +109,9 @@
   protected abstract void doUpdate(Iterable<E> instances, T txn)
       throws OrmException;
 
+  protected abstract void doUpsert(Iterable<E> instances, T txn)
+      throws OrmException;
+
   protected abstract void doDelete(Iterable<E> instances, T txn)
       throws OrmException;
 
diff --git a/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java b/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
index a424f8f..def8385 100644
--- a/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
+++ b/src/main/java/com/google/gwtorm/client/impl/AbstractTransaction.java
@@ -30,11 +30,13 @@
 
   protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingInsert;
   protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingUpdate;
+  protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingUpsert;
   protected final Map<Object, Action<?, Key<?>, AbstractTransaction>> pendingDelete;
 
   protected AbstractTransaction() {
     pendingInsert = newMap();
     pendingUpdate = newMap();
+    pendingUpsert = newMap();
     pendingDelete = newMap();
   }
 
@@ -48,6 +50,9 @@
     for (Action<?, Key<?>, AbstractTransaction> a : pendingUpdate.values()) {
       a.doUpdate(this);
     }
+    for (Action<?, Key<?>, AbstractTransaction> a : pendingUpsert.values()) {
+      a.doUpsert(this);
+    }
   }
 
   <E, K extends Key<?>, T extends AbstractTransaction> void queueInsert(
@@ -60,6 +65,11 @@
     queue(pendingUpdate, access, list);
   }
 
+  <E, K extends Key<?>, T extends AbstractTransaction> void queueUpsert(
+      final AbstractAccess<E, ?, T> access, final Iterable<E> list) {
+    queue(pendingUpsert, access, list);
+  }
+
   <E, K extends Key<?>, T extends AbstractTransaction> void queueDelete(
       final AbstractAccess<E, ?, T> access, final Iterable<E> list) {
     queue(pendingDelete, access, list);
@@ -117,6 +127,10 @@
       access.doUpdate(instances, t);
     }
 
+    void doUpsert(final T t) throws OrmException {
+      access.doUpsert(instances, t);
+    }
+
     void doDelete(final T t) throws OrmException {
       access.doDelete(instances, t);
     }
diff --git a/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java b/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
index aa5775f..0304a2e 100644
--- a/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
+++ b/src/main/java/com/google/gwtorm/jdbc/JdbcAccess.java
@@ -197,6 +197,55 @@
   }
 
   @Override
+  protected void doUpsert(final Iterable<T> instances, final JdbcTransaction txn)
+      throws OrmException {
+    // Assume update first, it will cheaply tell us if the row is missing.
+    //
+    Collection<T> inserts = null;
+    try {
+      final PreparedStatement ps;
+
+      ps = schema.getConnection().prepareStatement(getUpdateOneSql());
+      try {
+        int cnt = 0;
+        for (final T o : instances) {
+          bindOneUpdate(ps, o);
+          ps.addBatch();
+          cnt++;
+        }
+
+        final int[] states = ps.executeBatch();
+        if (states == null) {
+          inserts = new ArrayList<T>(cnt);
+          for(T o : instances){
+            inserts.add(o);
+          }
+        } else {
+          int i = 0;
+          for (T o : instances) {
+            if (states.length <= i || states[i] != 1) {
+              if (inserts == null) {
+                inserts = new ArrayList<T>(cnt - i);
+              }
+              inserts.add(o);
+            }
+            i++;
+          }
+        }
+
+      } finally {
+        ps.close();
+      }
+    } catch (SQLException e) {
+      throw convertError("update", e);
+    }
+
+    if (inserts != null) {
+      doInsert(inserts, txn);
+    }
+  }
+
+  @Override
   protected void doDelete(final Iterable<T> instances, final JdbcTransaction txn)
       throws OrmException {
     try {