| // Copyright 2010 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gwtorm.nosql.generic; |
| |
| import com.google.gwtorm.nosql.CounterShard; |
| import com.google.gwtorm.nosql.IndexKeyBuilder; |
| import com.google.gwtorm.nosql.IndexRow; |
| import com.google.gwtorm.nosql.NoSqlSchema; |
| import com.google.gwtorm.schema.SequenceModel; |
| import com.google.gwtorm.server.AtomicUpdate; |
| import com.google.gwtorm.server.ListResultSet; |
| import com.google.gwtorm.server.OrmDuplicateKeyException; |
| import com.google.gwtorm.server.OrmException; |
| import com.google.gwtorm.server.ResultSet; |
| import com.google.gwtorm.server.Schema; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| /** |
| * Base implementation for {@link Schema} in a {@link GenericDatabase}. |
| * |
| * <p>NoSQL implementors must extend this class and provide implementations for the abstract methods |
| * declared here. Each schema instance will wrap one thread's connection to the data store. |
| * Therefore, unlike database, this class does not need to be thread-safe. |
| */ |
| public abstract class GenericSchema extends NoSqlSchema { |
| private final GenericDatabase<?, ?, ?> db; |
| |
| protected GenericSchema(final GenericDatabase<?, ?, ?> d) { |
| super(d); |
| db = d; |
| } |
| |
| public void flush() {} |
| |
| /** @return the database that created this schema instance. */ |
| public GenericDatabase<?, ?, ?> getDatabase() { |
| return db; |
| } |
| |
| /** |
| * Allocate a new unique value from a pool of values. |
| * |
| * <p>This method is only required to return a unique value per invocation. Implementors may |
| * override the method to provide an implementation that returns values out of order. |
| * |
| * <p>The default implementation of this method stores a {@link CounterShard} under the row key |
| * {@code ".sequence." + poolName}, and updates it through the atomic semantics of {@link |
| * #atomicUpdate(byte[], AtomicUpdate)}. If the row does not yet exist, it is initialized and the |
| * value 1 is returned. |
| * |
| * @param poolName name of the value pool to allocate from. This is typically the name of a |
| * sequence in the schema. |
| * @return a new unique value. |
| * @throws OrmException a unique value cannot be obtained. |
| */ |
| @Override |
| protected long nextLong(final String poolName) throws OrmException { |
| IndexKeyBuilder b = new IndexKeyBuilder(); |
| b.add(".sequence." + poolName); |
| b.delimiter(); |
| try { |
| final long[] res = new long[1]; |
| atomicUpdate( |
| b.toByteArray(), |
| new AtomicUpdate<byte[]>() { |
| @Override |
| public byte[] update(byte[] val) { |
| CounterShard ctr; |
| if (val != null) { |
| ctr = CounterShard.CODEC.decode(val); |
| } else { |
| long start = 1; |
| for (SequenceModel s : getDatabase().getSchemaModel().getSequences()) { |
| if (poolName.equals(s.getSequenceName())) { |
| start = s.getSequence().startWith(); |
| if (start == 0) { |
| start = 1; |
| } |
| break; |
| } |
| } |
| ctr = new CounterShard(start, Long.MAX_VALUE); |
| } |
| |
| if (ctr.isEmpty()) { |
| throw new NoMoreValues(); |
| } |
| |
| res[0] = ctr.next(); |
| return CounterShard.CODEC.encodeToByteString(ctr).toByteArray(); |
| } |
| }); |
| return res[0]; |
| } catch (NoMoreValues err) { |
| throw new OrmException("Counter '" + poolName + "' out of values"); |
| } |
| } |
| |
| /** |
| * Fetch one row's data. |
| * |
| * <p>The default implementation of this method creates a pair of keys and passes them to {@link |
| * #scan(byte[], byte[], int, boolean)}. The {@code fromKey} is the supplied {@code key}, while |
| * the {@code toKey} has '\0' appended onto {@code key}. If more than one row matches in that |
| * range, the method throws an exception. |
| * |
| * @param key key of the row to fetch and return. |
| * @return the data stored under {@code key}; null if no row exists. |
| * @throws OrmDuplicateKeyException more than one row was identified in the key scan. |
| * @throws OrmException the data store cannot process the request. |
| */ |
| public byte[] fetchRow(byte[] key) throws OrmDuplicateKeyException, OrmException { |
| final byte[] fromKey = key; |
| final byte[] toKey = new byte[key.length + 1]; |
| System.arraycopy(key, 0, toKey, 0, key.length); |
| |
| ResultSet<Row> r = scan(fromKey, toKey, 2, false); |
| try { |
| Iterator<Row> i = r.iterator(); |
| if (!i.hasNext()) { |
| return null; |
| } |
| |
| byte[] data = i.next().getValue(); |
| if (i.hasNext()) { |
| throw new OrmDuplicateKeyException("Unexpected duplicate keys"); |
| } |
| return data; |
| } finally { |
| r.close(); |
| } |
| } |
| |
| /** |
| * Fetch multiple rows at once. |
| * |
| * <p>The default implementation of this method is a simple iteration over each key and executes a |
| * sequential fetch with {@link #fetchRow(byte[])}. |
| * |
| * @param keys keys to fetch and return. |
| * @return iteration over the rows that exist and appear in {@code keys}. |
| * @throws OrmException the data store cannot process the request. |
| */ |
| public ResultSet<Row> fetchRows(Iterable<byte[]> keys) throws OrmException { |
| List<Row> r = new ArrayList<>(); |
| for (byte[] key : keys) { |
| byte[] val = fetchRow(key); |
| if (val != null) { |
| r.add(new Row(key, val)); |
| } |
| } |
| return new ListResultSet<>(r); |
| } |
| |
| /** |
| * Scan a range of keys and return any matching objects. |
| * |
| * <p>To fetch a single record with a scan, set {@code toKey} to the same array as {@code |
| * fromKey}, but append a trailing NUL byte (0x00). The caller should validate that the returned |
| * ResultSet contains no more than 1 row. |
| * |
| * <p>The resulting iteration does not support remove. |
| * |
| * <p>Each iteration element is a map entry, describing the row key and the row value. The map |
| * entry's value cannot be changed. |
| * |
| * @param fromKey key to start the scan on. This is inclusive. |
| * @param toKey key to stop the scan on. This is exclusive. |
| * @param limit maximum number of results to return. |
| * @param order if true the order will be preserved, false if the result order order can be |
| * arbitrary. |
| * @return result iteration for the requested range. The result set may be lazily filled, or |
| * filled completely. |
| * @throws OrmException an error occurred preventing the scan from completing. |
| */ |
| public abstract ResultSet<Row> scan(byte[] fromKey, byte[] toKey, int limit, boolean order) |
| throws OrmException; |
| |
| /** |
| * Atomically insert one row, failing if the row already exists. |
| * |
| * <p>The default implementation of this method relies upon the atomic nature of the {@link |
| * #atomicUpdate(byte[], AtomicUpdate)} primitive to test for the row's existence, and create the |
| * row only if it is not found. |
| * |
| * @param key key of the new row to insert. |
| * @param newData data of the new row. |
| * @throws OrmDuplicateKeyException another row already exists with the specified key. |
| * @throws OrmException the data store cannot process the request right now, for example due to a |
| * network connectivity problem. |
| */ |
| public void insert(byte[] key, final byte[] newData) |
| throws OrmDuplicateKeyException, OrmException { |
| try { |
| atomicUpdate( |
| key, |
| new AtomicUpdate<byte[]>() { |
| @Override |
| public byte[] update(byte[] oldData) { |
| if (oldData != null) { |
| throw new KeyExists(); |
| } |
| return newData; |
| } |
| }); |
| } catch (KeyExists err) { |
| throw new OrmDuplicateKeyException("Duplicate key"); |
| } |
| } |
| |
| /** |
| * Update a single row, inserting it if it does not exist. |
| * |
| * <p>Unlike insert, this method always succeeds. |
| * |
| * @param key key of the row to update, or insert if missing. |
| * @param data data to store at this row. |
| * @throws OrmException the data store cannot process the request, for example due to a network |
| * connectivity problem. |
| */ |
| public abstract void upsert(byte[] key, byte[] data) throws OrmException; |
| |
| /** |
| * Delete the row stored under the given key. |
| * |
| * <p>If the row does not exist, this method must complete successfully anyway. The intent of the |
| * caller is to ensure the row does not exist when the method completes, and a row that did not |
| * exist satisfies that intent. |
| * |
| * @param key the key to delete. |
| * @throws OrmException the data store cannot perform the removal. |
| */ |
| public abstract void delete(byte[] key) throws OrmException; |
| |
| /** |
| * Atomically read and update a single row. |
| * |
| * <p>Unlike schema's atomicUpdate() method, this method must handle missing rows. Implementations |
| * must be logically equivalent to the following, but performed atomically within the scope of the |
| * single row key: |
| * |
| * <pre> |
| * byte[] oldData = get(key); |
| * byte[] newData = update.update(oldData); |
| * if (newData != null) { |
| * upsert(key, newData); |
| * } else if (oldData != null) { |
| * remove(key); |
| * } |
| * return data; |
| * </pre> |
| * |
| * <p>Secondary index row updates are assumed to never be part of the atomic update transaction. |
| * This is an intentional design decision to fit with many NoSQL product's limitations to support |
| * only single-row atomic updates. |
| * |
| * <p>The {@code update} method may be invoked multiple times before the operation is considered |
| * successful. This permits an implementation to perform an opportunistic update attempt, and |
| * retry the update if the same row was modified by another concurrent worker. |
| * |
| * @param key the row key to read, update and return. |
| * @param update action to perform on the row's data element. The action may be passed null if the |
| * row doesn't exist. |
| * @throws OrmException the database cannot perform the update. |
| */ |
| public abstract void atomicUpdate(byte[] key, AtomicUpdate<byte[]> update) throws OrmException; |
| |
| /** |
| * Check (and delete) an index row if its a fossil. |
| * |
| * <p>As index rows are written ahead of the main data row being written out, scans sometimes see |
| * an index row that does not match the data row. These are ignored for a short period ({@link |
| * GenericDatabase#getMaxFossilAge()}) to allow the primary data row to eventually get written |
| * out. If however the writer never finished the update, these index rows are stale and need to be |
| * pruned. Any index row older than the fossil age is removed by this method. |
| * |
| * @param now timestamp when the current scan started. |
| * @param key the index row key. |
| * @param row the index row data. |
| */ |
| public void maybeFossilCollectIndexRow(long now, byte[] key, IndexRow row) { |
| if (row.getTimestamp() + db.getMaxFossilAge() <= now) { |
| fossilCollectIndexRow(key, row); |
| } |
| } |
| |
| /** |
| * Delete the given fossil index row. |
| * |
| * <p>This method is logically the same as {@link #delete(byte[])}, but its separated out to |
| * permit asynchronous delivery of the delete events since these are arriving during an index scan |
| * and are less time-critical than other delete operations. |
| * |
| * <p>The default implementation of this method calls {@link #delete(byte[])}. |
| * |
| * @param key index key to remove. |
| * @param row the index row data. |
| */ |
| protected void fossilCollectIndexRow(byte[] key, IndexRow row) { |
| try { |
| delete(key); |
| } catch (OrmException e) { |
| // Ignore a fossil delete error. |
| } |
| } |
| |
| @SuppressWarnings("serial") |
| private static class KeyExists extends RuntimeException {} |
| |
| @SuppressWarnings("serial") |
| private static class NoMoreValues extends RuntimeException {} |
| } |