Use multiple key lookup for secondary row scans

When a secondary index row is scanned and it doesn't cache the main
object data, load all of the matching object rows in one call to
the database.  This permits the implementation to batch the lookups
into a single call, reducing total latency.

Change-Id: I353329e65f0c75dcfe134a506292b999c97e4648
Signed-off-by: Shawn O. Pearce <sop@google.com>
diff --git a/src/main/java/com/google/gwtorm/nosql/generic/CandidateRow.java b/src/main/java/com/google/gwtorm/nosql/generic/CandidateRow.java
new file mode 100644
index 0000000..15ecbad
--- /dev/null
+++ b/src/main/java/com/google/gwtorm/nosql/generic/CandidateRow.java
@@ -0,0 +1,53 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.gwtorm.nosql.generic;
+
+import com.google.gwtorm.nosql.IndexRow;
+
+class CandidateRow {
+  private final byte[] indexKey;
+  private final IndexRow indexRow;
+  private byte[] objData;
+
+  CandidateRow(byte[] ik, IndexRow ir) {
+    indexKey = ik;
+    indexRow = ir;
+    objData = indexRow.getDataCopy();
+  }
+
+  byte[] getIndexKey() {
+    return indexKey;
+  }
+
+  IndexRow getIndexRow() {
+    return indexRow;
+  }
+
+  byte[] getDataKey() {
+    return indexRow.getDataKey();
+  }
+
+  boolean hasData() {
+    return objData != null;
+  }
+
+  byte[] getData() {
+    return objData;
+  }
+
+  void setData(byte[] objData) {
+    this.objData = objData;
+  }
+}
diff --git a/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java b/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
index 2a5b294..8c4df87 100644
--- a/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
+++ b/src/main/java/com/google/gwtorm/nosql/generic/GenericAccess.java
@@ -27,11 +27,14 @@
 import com.google.gwtorm.nosql.IndexKeyBuilder;
 import com.google.gwtorm.nosql.IndexRow;
 import com.google.gwtorm.nosql.NoSqlAccess;
+import com.google.protobuf.ByteString;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map.Entry;
 
 /** Base implementation for {@link Access} in a {@link GenericDatabase}. */
@@ -221,49 +224,91 @@
     byte[] lastKey = fromKey;
 
     SCAN: for (;;) {
-      int scanned = 0;
-      ResultSet<Row> rs = db.scan(lastKey, toKey, limit, order);
-      for (Row ent : rs) {
-        final byte[] idxkey = ent.getKey();
-        lastKey = idxkey;
-        scanned++;
+      List<CandidateRow> scanned;
+      if (0 < limit) {
+        scanned = new ArrayList<CandidateRow>(limit);
+      } else {
+        scanned = new ArrayList<CandidateRow>();
+      }
 
-        // Decode the row and try to get the object data. If its
-        // not stored in this row in the secondary index we need
-        // to get the authoritative copy from the main index.
+      boolean needData = false;
+      for (Row ent : db.scan(lastKey, toKey, limit, order)) {
+        byte[] idxKey = ent.getKey();
+        IndexRow idxRow = IndexRow.CODEC.decode(ent.getValue());
+        CandidateRow row = new CandidateRow(idxKey, idxRow);
+        scanned.add(row);
+        needData |= !row.hasData();
+        lastKey = idxKey;
+      }
+
+      if (needData) {
+        // At least one row from the index didn't have a cached copy of the
+        // object stored within. For these rows we need to fetch the real
+        // data row and join it against the index information.
         //
-        final IndexRow r = IndexRow.CODEC.decode(ent.getValue());
-        byte[] objData = r.getDataCopy();
-        if (objData == null) {
-          b = new IndexKeyBuilder();
-          b.add(getRelationName());
-          b.delimiter();
-          b.addRaw(r.getDataKey());
-          objData = db.fetchRow(b.toByteArray());
+        HashMap<ByteString, CandidateRow> byKey =
+            new HashMap<ByteString, CandidateRow>();
+        List<byte[]> toFetch = new ArrayList<byte[]>(scanned.size());
+
+        for (CandidateRow idxRow : scanned) {
+          if (!idxRow.hasData()) {
+            IndexKeyBuilder pk = new IndexKeyBuilder();
+            pk.add(getRelationName());
+            pk.delimiter();
+            pk.addRaw(idxRow.getDataKey());
+            byte[] key = pk.toByteArray();
+
+            byKey.put(ByteString.copyFrom(key), idxRow);
+            toFetch.add(key);
+          }
         }
 
-        // If we have no data present and this row is stale enough,
-        // drop the row out of the index.
-        //
-        if (objData == null) {
-          db.maybeFossilCollectIndexRow(now, idxkey, r);
-          continue;
+        for (Row objRow : db.fetchRows(toFetch)) {
+          CandidateRow idxRow = byKey.get(ByteString.copyFrom(objRow.getKey()));
+          if (idxRow != null) {
+            idxRow.setData(objRow.getValue());
+          }
         }
 
-        // Verify the object still matches the predicate of the index.
-        // If it does, include it in the result. Otherwise, maybe we
-        // should drop it from the index.
+        for (CandidateRow idxRow : scanned) {
+          // If we have no data present and this row is stale enough,
+          // drop the row out of the index.
+          //
+          if (!idxRow.hasData()) {
+            db.maybeFossilCollectIndexRow(now, idxRow.getIndexKey(), //
+                idxRow.getIndexRow());
+            continue;
+          }
+
+          // Verify the object still matches the predicate of the index.
+          // If it does, include it in the result. Otherwise, maybe we
+          // should drop it from the index.
+          //
+          byte[] bin = idxRow.getData();
+          final T obj = getObjectCodec().decode(bin);
+          if (matches(idx, obj, idxRow.getIndexKey())) {
+            cache().put(primaryKey(obj), bin);
+            res.add(obj);
+            if (limit > 0 && res.size() == limit) {
+              break SCAN;
+            }
+          } else {
+            db.maybeFossilCollectIndexRow(now, idxRow.getIndexKey(), //
+                idxRow.getIndexRow());
+          }
+        }
+      } else {
+        // All of the rows are using a cached copy of the object. We can
+        // simply decode and produce those without further validation.
         //
-        final T obj = getObjectCodec().decode(objData);
-        if (matches(idx, obj, idxkey)) {
-          cache().put(primaryKey(obj), objData);
+        for (CandidateRow idxRow : scanned) {
+          byte[] bin = idxRow.getData();
+          T obj = getObjectCodec().decode(bin);
+          cache().put(primaryKey(obj), bin);
           res.add(obj);
           if (limit > 0 && res.size() == limit) {
-            rs.close();
             break SCAN;
           }
-        } else {
-          db.maybeFossilCollectIndexRow(now, idxkey, r);
         }
       }
 
@@ -271,8 +316,7 @@
       // If scanned < limit, we saw every index row that might be
       // a match, and no further rows would exist.
       //
-      if (limit == 0 || scanned < limit) {
-        rs.close();
+      if (limit == 0 || scanned.size() < limit) {
         break SCAN;
       }
 
@@ -523,8 +567,7 @@
     encodePrimaryKey(b, primaryKey(obj));
     final byte[] key = b.toByteArray();
 
-    return IndexRow.CODEC.encodeToByteString(IndexRow.forKey(now, key))
-        .toByteArray();
+    return IndexRow.CODEC.encodeToByteArray(IndexRow.forKey(now, key));
   }
 
   private static class IndexException extends RuntimeException {