| // Copyright 2010 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gwtorm.nosql.generic; |
| |
| import com.google.gwtorm.nosql.CounterShard; |
| import com.google.gwtorm.nosql.IndexKeyBuilder; |
| import com.google.gwtorm.nosql.IndexRow; |
| import com.google.gwtorm.nosql.NoSqlSchema; |
| import com.google.gwtorm.schema.SequenceModel; |
| import com.google.gwtorm.server.AtomicUpdate; |
| import com.google.gwtorm.server.ListResultSet; |
| import com.google.gwtorm.server.OrmDuplicateKeyException; |
| import com.google.gwtorm.server.OrmException; |
| import com.google.gwtorm.server.ResultSet; |
| import com.google.gwtorm.server.Schema; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| /** |
| * Base implementation for {@link Schema} in a {@link GenericDatabase}. |
| * <p> |
| * NoSQL implementors must extend this class and provide implementations for the |
| * abstract methods declared here. Each schema instance will wrap one thread's |
| * connection to the data store. Therefore, unlike database, this class does not |
| * need to be thread-safe. |
| */ |
| public abstract class GenericSchema extends NoSqlSchema { |
| private final GenericDatabase<?, ?, ?> db; |
| |
| protected GenericSchema(final GenericDatabase<?, ?, ?> d) { |
| super(d); |
| db = d; |
| } |
| |
| public void flush() throws OrmException { |
| } |
| |
| /** @return the database that created this schema instance. */ |
| public GenericDatabase<?, ?, ?> getDatabase() { |
| return db; |
| } |
| |
| /** |
| * Allocate a new unique value from a pool of values. |
| * <p> |
| * This method is only required to return a unique value per invocation. |
| * Implementors may override the method to provide an implementation that |
| * returns values out of order. |
| * <p> |
| * The default implementation of this method stores a {@link CounterShard} |
| * under the row key {@code ".sequence." + poolName}, and updates it through |
| * the atomic semantics of {@link #atomicUpdate(byte[], AtomicUpdate)}. If the |
| * row does not yet exist, it is initialized and the value 1 is returned. |
| * |
| * @param poolName name of the value pool to allocate from. This is typically |
| * the name of a sequence in the schema. |
| * @return a new unique value. |
| * @throws OrmException a unique value cannot be obtained. |
| */ |
| @Override |
| protected long nextLong(final String poolName) throws OrmException { |
| IndexKeyBuilder b = new IndexKeyBuilder(); |
| b.add(".sequence." + poolName); |
| b.delimiter(); |
| try { |
| final long[] res = new long[1]; |
| atomicUpdate(b.toByteArray(), new AtomicUpdate<byte[]>() { |
| @Override |
| public byte[] update(byte[] val) { |
| CounterShard ctr; |
| if (val != null) { |
| ctr = CounterShard.CODEC.decode(val); |
| } else { |
| long start = 1; |
| for (SequenceModel s : getDatabase().getSchemaModel() |
| .getSequences()) { |
| if (poolName.equals(s.getSequenceName())) { |
| start = s.getSequence().startWith(); |
| if (start == 0) { |
| start = 1; |
| } |
| break; |
| } |
| } |
| ctr = new CounterShard(start, Long.MAX_VALUE); |
| } |
| |
| if (ctr.isEmpty()) { |
| throw new NoMoreValues(); |
| } |
| |
| res[0] = ctr.next(); |
| return CounterShard.CODEC.encodeToByteString(ctr).toByteArray(); |
| } |
| }); |
| return res[0]; |
| } catch (NoMoreValues err) { |
| throw new OrmException("Counter '" + poolName + "' out of values"); |
| } |
| } |
| |
| /** |
| * Fetch one row's data. |
| * <p> |
| * The default implementation of this method creates a pair of keys and passes |
| * them to {@link #scan(byte[], byte[], int, boolean)}. The {@code fromKey} is |
| * the supplied {@code key}, while the {@code toKey} has '\0' appended onto |
| * {@code key}. If more than one row matches in that range, the method throws |
| * an exception. |
| * |
| * @param key key of the row to fetch and return. |
| * @return the data stored under {@code key}; null if no row exists. |
| * @throws OrmDuplicateKeyException more than one row was identified in the |
| * key scan. |
| * @throws OrmException the data store cannot process the request. |
| */ |
| public byte[] fetchRow(byte[] key) throws OrmDuplicateKeyException, |
| OrmException { |
| final byte[] fromKey = key; |
| final byte[] toKey = new byte[key.length + 1]; |
| System.arraycopy(key, 0, toKey, 0, key.length); |
| |
| ResultSet<Row> r = scan(fromKey, toKey, 2, false); |
| try { |
| Iterator<Row> i = r.iterator(); |
| if (!i.hasNext()) { |
| return null; |
| } |
| |
| byte[] data = i.next().getValue(); |
| if (i.hasNext()) { |
| throw new OrmDuplicateKeyException("Unexpected duplicate keys"); |
| } |
| return data; |
| } finally { |
| r.close(); |
| } |
| } |
| |
| /** |
| * Fetch multiple rows at once. |
| * <p> |
| * The default implementation of this method is a simple iteration over each |
| * key and executes a sequential fetch with {@link #fetchRow(byte[])}. |
| * |
| * @param keys keys to fetch and return. |
| * @return iteration over the rows that exist and appear in {@code keys}. |
| * @throws OrmException the data store cannot process the request. |
| */ |
| public ResultSet<Row> fetchRows(Iterable<byte[]> keys) throws OrmException { |
| List<Row> r = new ArrayList<Row>(); |
| for (byte[] key : keys) { |
| byte[] val = fetchRow(key); |
| if (val != null) { |
| r.add(new Row(key, val)); |
| } |
| } |
| return new ListResultSet<Row>(r); |
| } |
| |
| /** |
| * Scan a range of keys and return any matching objects. |
| * <p> |
| * To fetch a single record with a scan, set {@code toKey} to the same array |
| * as {@code fromKey}, but append a trailing NUL byte (0x00). The caller |
| * should validate that the returned ResultSet contains no more than 1 row. |
| * <p> |
| * The resulting iteration does not support remove. |
| * <p> |
| * Each iteration element is a map entry, describing the row key and the row |
| * value. The map entry's value cannot be changed. |
| * |
| * @param fromKey key to start the scan on. This is inclusive. |
| * @param toKey key to stop the scan on. This is exclusive. |
| * @param limit maximum number of results to return. |
| * @param order if true the order will be preserved, false if the result order |
| * order can be arbitrary. |
| * @return result iteration for the requested range. The result set may be |
| * lazily filled, or filled completely. |
| * @throws OrmException an error occurred preventing the scan from completing. |
| */ |
| public abstract ResultSet<Row> scan(byte[] fromKey, byte[] toKey, int limit, |
| boolean order) throws OrmException; |
| |
| /** |
| * Atomically insert one row, failing if the row already exists. |
| * <p> |
| * The default implementation of this method relies upon the atomic nature of |
| * the {@link #atomicUpdate(byte[], AtomicUpdate)} primitive to test for the |
| * row's existence, and create the row only if it is not found. |
| * |
| * @param key key of the new row to insert. |
| * @param newData data of the new row. |
| * @throws OrmDuplicateKeyException another row already exists with the |
| * specified key. |
| * @throws OrmException the data store cannot process the request right now, |
| * for example due to a network connectivity problem. |
| */ |
| public void insert(byte[] key, final byte[] newData) |
| throws OrmDuplicateKeyException, OrmException { |
| try { |
| atomicUpdate(key, new AtomicUpdate<byte[]>() { |
| @Override |
| public byte[] update(byte[] oldData) { |
| if (oldData != null) { |
| throw new KeyExists(); |
| } |
| return newData; |
| } |
| }); |
| } catch (KeyExists err) { |
| throw new OrmDuplicateKeyException("Duplicate key"); |
| } |
| } |
| |
| /** |
| * Update a single row, inserting it if it does not exist. |
| * <p> |
| * Unlike insert, this method always succeeds. |
| * |
| * @param key key of the row to update, or insert if missing. |
| * @param data data to store at this row. |
| * @throws OrmException the data store cannot process the request, for example |
| * due to a network connectivity problem. |
| */ |
| public abstract void upsert(byte[] key, byte[] data) throws OrmException; |
| |
| /** |
| * Delete the row stored under the given key. |
| * <p> |
| * If the row does not exist, this method must complete successfully anyway. |
| * The intent of the caller is to ensure the row does not exist when the |
| * method completes, and a row that did not exist satisfies that intent. |
| * |
| * @param key the key to delete. |
| * @throws OrmException the data store cannot perform the removal. |
| */ |
| public abstract void delete(byte[] key) throws OrmException; |
| |
| /** |
| * Atomically read and update a single row. |
| * <p> |
| * Unlike schema's atomicUpdate() method, this method must handle missing |
| * rows. Implementations must be logically equivalent to the following, but |
| * performed atomically within the scope of the single row key: |
| * |
| * <pre> |
| * byte[] oldData = get(key); |
| * byte[] newData = update.update(oldData); |
| * if (newData != null) { |
| * upsert(key, newData); |
| * } else if (oldData != null) { |
| * remove(key); |
| * } |
| * return data; |
| * </pre> |
| * <p> |
| * Secondary index row updates are assumed to never be part of the atomic |
| * update transaction. This is an intentional design decision to fit with many |
| * NoSQL product's limitations to support only single-row atomic updates. |
| * <p> |
| * The {@code update} method may be invoked multiple times before the |
| * operation is considered successful. This permits an implementation to |
| * perform an opportunistic update attempt, and retry the update if the same |
| * row was modified by another concurrent worker. |
| * |
| * @param key the row key to read, update and return. |
| * @param update action to perform on the row's data element. The action may |
| * be passed null if the row doesn't exist. |
| * @throws OrmException the database cannot perform the update. |
| */ |
| public abstract void atomicUpdate(byte[] key, AtomicUpdate<byte[]> update) |
| throws OrmException; |
| |
| /** |
| * Check (and delete) an index row if its a fossil. |
| * <p> |
| * As index rows are written ahead of the main data row being written out, |
| * scans sometimes see an index row that does not match the data row. These |
| * are ignored for a short period ({@link GenericDatabase#getMaxFossilAge()}) |
| * to allow the primary data row to eventually get written out. If however the |
| * writer never finished the update, these index rows are stale and need to be |
| * pruned. Any index row older than the fossil age is removed by this method. |
| * |
| * @param now timestamp when the current scan started. |
| * @param key the index row key. |
| * @param row the index row data. |
| */ |
| public void maybeFossilCollectIndexRow(long now, byte[] key, IndexRow row) { |
| if (row.getTimestamp() + db.getMaxFossilAge() <= now) { |
| fossilCollectIndexRow(key, row); |
| } |
| } |
| |
| /** |
| * Delete the given fossil index row. |
| * <p> |
| * This method is logically the same as {@link #delete(byte[])}, but its |
| * separated out to permit asynchronous delivery of the delete events since |
| * these are arriving during an index scan and are less time-critical than |
| * other delete operations. |
| * <p> |
| * The default implementation of this method calls {@link #delete(byte[])}. |
| * |
| * @param key index key to remove. |
| * @param row the index row data. |
| */ |
| protected void fossilCollectIndexRow(byte[] key, IndexRow row) { |
| try { |
| delete(key); |
| } catch (OrmException e) { |
| // Ignore a fossil delete error. |
| } |
| } |
| |
| @SuppressWarnings("serial") |
| private static class KeyExists extends RuntimeException { |
| } |
| |
| @SuppressWarnings("serial") |
| private static class NoMoreValues extends RuntimeException { |
| } |
| } |