blob: ecfef3373e22cf68c543f72c7de9d232a7b1ff9c [file] [log] [blame]
// Copyright 2010 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gwtorm.nosql.generic;
import com.google.gwtorm.client.Key;
import com.google.gwtorm.nosql.IndexFunction;
import com.google.gwtorm.nosql.IndexKeyBuilder;
import com.google.gwtorm.nosql.IndexRow;
import com.google.gwtorm.nosql.NoSqlAccess;
import com.google.gwtorm.server.AbstractResultSet;
import com.google.gwtorm.server.Access;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.ListResultSet;
import com.google.gwtorm.server.OrmConcurrencyException;
import com.google.gwtorm.server.OrmDuplicateKeyException;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
/** Base implementation for {@link Access} in a {@link GenericDatabase}. */
public abstract class GenericAccess<T, K extends Key<?>> extends
NoSqlAccess<T, K> {
/** Maximum number of results to cache to improve updates on upsert. */
private static final int MAX_SZ = 64;
private final GenericSchema db;
private LinkedHashMap<K, byte[]> cache;
protected GenericAccess(final GenericSchema s) {
super(s);
db = s;
}
@SuppressWarnings("serial")
protected LinkedHashMap<K, byte[]> cache() {
if (cache == null) {
cache = new LinkedHashMap<K, byte[]>(8) {
@Override
protected boolean removeEldestEntry(Entry<K, byte[]> entry) {
return MAX_SZ <= size();
}
};
}
return cache;
}
/**
* Lookup a single entity via its primary key.
*
* @param key the primary key instance; must not be null.
* @return the entity; null if no entity has this key.
* @throws OrmException the data lookup failed.
* @throws OrmDuplicateKeyException more than one row was identified in the
* key scan.
*/
@Override
public T get(K key) throws OrmException, OrmDuplicateKeyException {
byte[] bin = db.fetchRow(dataRowKey(key));
if (bin != null) {
T obj = getObjectCodec().decode(bin);
cache().put(primaryKey(obj), bin);
return obj;
} else {
return null;
}
}
@Override
public ResultSet<T> get(final Iterable<K> keys) throws OrmException {
final ResultSet<Row> rs = db.fetchRows(new Iterable<byte[]>() {
@Override
public Iterator<byte[]> iterator() {
return new Iterator<byte[]>() {
private final Iterator<K> i = keys.iterator();
@Override
public boolean hasNext() {
return i.hasNext();
}
@Override
public byte[] next() {
return dataRowKey(i.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
});
final Iterator<Row> i = rs.iterator();
return new AbstractResultSet<T>() {
@Override
protected boolean hasNext() {
return i.hasNext();
}
@Override
protected T next() {
byte[] bin = i.next().getValue();
T obj = getObjectCodec().decode(bin);
cache().put(primaryKey(obj), bin);
return obj;
}
@Override
public void close() {
rs.close();
}
};
}
/**
* Scan a range of keys from the data rows and return any matching objects.
*
* @param fromKey key to start the scan on. This is inclusive.
* @param toKey key to stop the scan on. This is exclusive.
* @param limit maximum number of results to return.
* @param order if true the order will be preserved, false if the result order
* order can be arbitrary.
* @return result set for the requested range. The result set may be lazily
* filled, or filled completely.
* @throws OrmException an error occurred preventing the scan from completing.
*/
@Override
protected ResultSet<T> scanPrimaryKey(byte[] fromKey, byte[] toKey,
int limit, boolean order) throws OrmException {
IndexKeyBuilder b;
b = new IndexKeyBuilder();
b.add(getRelationName());
b.delimiter();
b.addRaw(fromKey);
fromKey = b.toByteArray();
b = new IndexKeyBuilder();
b.add(getRelationName());
b.delimiter();
b.addRaw(toKey);
toKey = b.toByteArray();
final ResultSet<Row> rs = db.scan(fromKey, toKey, limit, order);
final Iterator<Row> i = rs.iterator();
return new AbstractResultSet<T>() {
@Override
protected boolean hasNext() {
return i.hasNext();
}
@Override
protected T next() {
byte[] bin = i.next().getValue();
T obj = getObjectCodec().decode(bin);
cache().put(primaryKey(obj), bin);
return obj;
}
@Override
public void close() {
rs.close();
}
};
}
/**
* Scan a range of index keys and return any matching objects.
*
* @param idx the index function describing the index to scan.
* @param fromKey key to start the scan on. This is inclusive.
* @param toKey key to stop the scan on. This is exclusive.
* @param limit maximum number of results to return.
* @param order if true the order will be preserved, false if the result order
* order can be arbitrary.
* @return result set for the requested range. The result set may be lazily
* filled, or filled completely.
* @throws OrmException an error occurred preventing the scan from completing.
*/
@Override
protected ResultSet<T> scanIndex(IndexFunction<T> idx, byte[] fromKey,
byte[] toKey, int limit, boolean order) throws OrmException {
final long now = System.currentTimeMillis();
IndexKeyBuilder b;
b = new IndexKeyBuilder();
b.add(getRelationName());
b.add('.');
b.add(idx.getName());
b.delimiter();
b.addRaw(fromKey);
fromKey = b.toByteArray();
b = new IndexKeyBuilder();
b.add(getRelationName());
b.add('.');
b.add(idx.getName());
b.delimiter();
b.addRaw(toKey);
toKey = b.toByteArray();
final ArrayList<T> res = new ArrayList<>();
byte[] lastKey = fromKey;
SCAN: for (;;) {
List<CandidateRow> scanned;
if (0 < limit) {
scanned = new ArrayList<>(limit);
} else {
scanned = new ArrayList<>();
}
boolean needData = false;
for (Row ent : db.scan(lastKey, toKey, limit, order)) {
byte[] idxKey = ent.getKey();
IndexRow idxRow = IndexRow.CODEC.decode(ent.getValue());
CandidateRow row = new CandidateRow(idxKey, idxRow);
scanned.add(row);
needData |= !row.hasData();
lastKey = idxKey;
}
if (needData) {
// At least one row from the index didn't have a cached copy of the
// object stored within. For these rows we need to fetch the real
// data row and join it against the index information.
//
HashMap<ByteString, CandidateRow> byKey = new HashMap<>();
List<byte[]> toFetch = new ArrayList<>(scanned.size());
for (CandidateRow idxRow : scanned) {
if (!idxRow.hasData()) {
IndexKeyBuilder pk = new IndexKeyBuilder();
pk.add(getRelationName());
pk.delimiter();
pk.addRaw(idxRow.getDataKey());
byte[] key = pk.toByteArray();
byKey.put(ByteString.copyFrom(key), idxRow);
toFetch.add(key);
}
}
for (Row objRow : db.fetchRows(toFetch)) {
CandidateRow idxRow = byKey.get(ByteString.copyFrom(objRow.getKey()));
if (idxRow != null) {
idxRow.setData(objRow.getValue());
}
}
for (CandidateRow idxRow : scanned) {
// If we have no data present and this row is stale enough,
// drop the row out of the index.
//
if (!idxRow.hasData()) {
db.maybeFossilCollectIndexRow(now, idxRow.getIndexKey(), //
idxRow.getIndexRow());
continue;
}
// Verify the object still matches the predicate of the index.
// If it does, include it in the result. Otherwise, maybe we
// should drop it from the index.
//
byte[] bin = idxRow.getData();
final T obj = getObjectCodec().decode(bin);
if (matches(idx, obj, idxRow.getIndexKey())) {
cache().put(primaryKey(obj), bin);
res.add(obj);
if (limit > 0 && res.size() == limit) {
break SCAN;
}
} else {
db.maybeFossilCollectIndexRow(now, idxRow.getIndexKey(), //
idxRow.getIndexRow());
}
}
} else {
// All of the rows are using a cached copy of the object. We can
// simply decode and produce those without further validation.
//
for (CandidateRow idxRow : scanned) {
byte[] bin = idxRow.getData();
T obj = getObjectCodec().decode(bin);
cache().put(primaryKey(obj), bin);
res.add(obj);
if (limit > 0 && res.size() == limit) {
break SCAN;
}
}
}
// If we have no limit we scanned everything, so break out.
// If scanned < limit, we saw every index row that might be
// a match, and no further rows would exist.
//
if (limit == 0 || scanned.size() < limit) {
break SCAN;
}
// Otherwise we have to scan again starting after lastKey.
//
b = new IndexKeyBuilder();
b.addRaw(lastKey);
b.nul();
lastKey = b.toByteArray();
}
return new ListResultSet<>(res);
}
@Override
public void insert(Iterable<T> instances) throws OrmException {
for (T obj : instances) {
insertOne(obj);
}
db.flush();
}
private void insertOne(T nObj) throws OrmException {
writeNewIndexes(null, nObj);
final byte[] key = dataRowKey(primaryKey(nObj));
db.insert(key, getObjectCodec().encodeToByteString(nObj).toByteArray());
}
@Override
public void update(Iterable<T> instances) throws OrmException {
for (T obj : instances) {
upsertOne(obj, true);
}
db.flush();
}
@Override
public void upsert(Iterable<T> instances) throws OrmException {
for (T obj : instances) {
upsertOne(obj, false);
}
db.flush();
}
private void upsertOne(T newObj, boolean mustExist) throws OrmException {
final byte[] key = dataRowKey(primaryKey(newObj));
T oldObj;
byte[] oldBin = cache().get(primaryKey(newObj));
if (oldBin != null) {
oldObj = getObjectCodec().decode(oldBin);
} else if (mustExist) {
oldBin = db.fetchRow(key);
if (oldBin != null) {
oldObj = getObjectCodec().decode(oldBin);
} else {
throw new OrmConcurrencyException();
}
} else {
oldObj = null;
}
writeNewIndexes(oldObj, newObj);
db.upsert(key, getObjectCodec().encodeToByteString(newObj).toByteArray());
pruneOldIndexes(oldObj, newObj);
}
/**
* Insert secondary index rows for an object about to be written.
* <p>
* Insert or update operations should invoke this method before the main data
* row is written, allowing the secondary index rows to be put into the data
* store before the main data row arrives. Compatible scan implementations
* (such as {@link #scanIndex(IndexFunction, byte[], byte[], int, boolean)}
* above) will ignore these rows for a short time period.
*
* @param oldObj an old copy of the object; if non-null this may be used to
* avoid writing unnecessary secondary index rows that already exist.
* @param newObj the new (or updated) object being stored. Must not be null.
* @throws OrmException the data store is unable to update an index row.
*/
protected void writeNewIndexes(T oldObj, T newObj) throws OrmException {
final byte[] idxData = indexRowData(newObj);
for (IndexFunction<T> f : getIndexes()) {
if (f.includes(newObj)) {
final byte[] idxKey = indexRowKey(f, newObj);
if (oldObj == null || !matches(f, oldObj, idxKey)) {
db.upsert(idxKey, idxData);
}
}
}
}
/**
* Remove old secondary index rows that are no longer valid for an object.
*
* @param oldObj an old copy of the object, prior to the current update taking
* place. If null the method does nothing and simply returns.
* @param newObj the new copy of the object. Index rows that are still valid
* for {@code #newObj} are left alone. If null, all index rows for
* {@code oldObj} are removed.
* @throws OrmException the data store is unable to remove an index row.
*/
protected void pruneOldIndexes(final T oldObj, T newObj) throws OrmException {
if (oldObj != null) {
for (IndexFunction<T> f : getIndexes()) {
if (f.includes(oldObj)) {
final byte[] idxKey = indexRowKey(f, oldObj);
if (newObj == null || !matches(f, newObj, idxKey)) {
db.delete(idxKey);
}
}
}
}
}
@Override
public void delete(Iterable<T> instances) throws OrmException {
for (T oldObj : instances) {
db.delete(dataRowKey(primaryKey(oldObj)));
pruneOldIndexes(oldObj, null);
cache().remove(primaryKey(oldObj));
}
db.flush();
}
@Override
public T atomicUpdate(K key, final AtomicUpdate<T> update)
throws OrmException {
final IndexKeyBuilder b = new IndexKeyBuilder();
b.add(getRelationName());
b.delimiter();
encodePrimaryKey(b, key);
try {
@SuppressWarnings("unchecked")
final T[] res = (T[]) new Object[3];
db.atomicUpdate(b.toByteArray(), new AtomicUpdate<byte[]>() {
@Override
public byte[] update(byte[] data) {
if (data != null) {
final T oldObj = getObjectCodec().decode(data);
final T newObj = getObjectCodec().decode(data);
res[0] = update.update(newObj);
res[1] = oldObj;
res[2] = newObj;
try {
writeNewIndexes(oldObj, newObj);
} catch (OrmException err) {
throw new IndexException(err);
}
return getObjectCodec().encodeToByteString(newObj).toByteArray();
} else {
res[0] = null;
return null;
}
}
});
if (res[0] != null) {
pruneOldIndexes(res[1], res[2]);
}
return res[0];
} catch (IndexException err) {
throw err.cause;
}
}
/**
* Determine if an object still matches the index row.
* <p>
* This method checks that the object's fields still match the criteria
* necessary for it to be part of the index defined by {@code f}. It also
* formats the index key and validates it is still identical to {@code exp}.
*
* @param f the function that defines the index.
* @param obj the object instance being tested; must not be null.
* @param exp the index row key, as scanned from the index.
* @return true if the object still matches the data encoded in {@code #exp}.
*/
protected boolean matches(IndexFunction<T> f, T obj, byte[] exp) {
return f.includes(obj) && Arrays.equals(exp, indexRowKey(f, obj));
}
/**
* Generate the row key for the object's primary data row.
* <p>
* The default implementation uses the relation name, a delimiter, and then
* the encoded primary key.
*
* @param key key of the object.
* @return the object's data row key.
*/
protected byte[] dataRowKey(K key) {
IndexKeyBuilder b = new IndexKeyBuilder();
b.add(getRelationName());
b.delimiter();
encodePrimaryKey(b, key);
return b.toByteArray();
}
/**
* Generate the row key for an object's secondary index row.
* <p>
* The default implementation uses the relation name, '.', the index name, a
* delimiter, the indexed fields encoded, a delimiter, and then the encoded
* primary key (without the relation name prefix).
* <p>
* The object's primary key is always appended onto the end of the secondary
* index row key to ensure that objects with the same field values still get
* distinct rows in the secondary index.
*
* @param idx function that describes the index.
* @param obj the object the index record should reference.
* @return the encoded secondary index row key.
*/
protected byte[] indexRowKey(IndexFunction<T> idx, T obj) {
IndexKeyBuilder b = new IndexKeyBuilder();
b.add(getRelationName());
b.add('.');
b.add(idx.getName());
b.delimiter();
idx.encode(b, obj);
b.delimiter();
encodePrimaryKey(b, primaryKey(obj));
return b.toByteArray();
}
/**
* Generate the data to store in a secondary index row for an object.
* <p>
* The default implementation of this method stores the encoded primary key,
* and the current system timestamp.
*
* @param obj the object the index record should reference.
* @return the encoded secondary index row data.
*/
protected byte[] indexRowData(T obj) {
final long now = System.currentTimeMillis();
final IndexKeyBuilder b = new IndexKeyBuilder();
encodePrimaryKey(b, primaryKey(obj));
final byte[] key = b.toByteArray();
return IndexRow.CODEC.encodeToByteArray(IndexRow.forKey(now, key));
}
@SuppressWarnings("serial")
private static class IndexException extends RuntimeException {
final OrmException cause;
IndexException(OrmException err) {
super(err);
this.cause = err;
}
}
}