helenus/NOTES

317 lines
11 KiB
Text

--- Cache
// `E` is the type of the Entity class or one of:
// - ResultSet
// - ArrayTuple{N}
// - Count
// `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the
// Operation classes, it is going to be one of:
// - ResultSet
// - ArrayTuple{N}
// - or a type previously registered as a HelenusEntity.
// In the form of a:
// - Stream<?> or an
// - Optional<?>
//
// Operation/
// |-- AbstractStatementOperation
// | |-- AbstractOperation
// | | |-- AbstractFilterOperation
// | | | |-- CountOperation
// | | | |-- DeleteOperation
// | | | `-- UpdateOperation
// | | |-- BoundOperation
// | | `-- InsertOperation
// | |-- AbstractOptionalOperation
// | | |-- AbstractFilterOptionalOperation
// | | | |-- SelectFirstOperation
// | | | `-- SelectFirstTransformingOperation
// | | `-- BoundOptionalOperation
// | `-- AbstractStreamOperation
// | |-- AbstractFilterStreamOperation
// | | |-- SelectOperation
// | | `-- SelectTransformingOperation
// | `-- BoundStreamOperation
// |-- PreparedOperation
// |-- PreparedOptionalOperation
// `-- PreparedStreamOperation
//
// These all boil down to: Select, Update, Insert, Delete and Count
//
// -- Select:
// 1) Select statements that contain all primary key information will be "distinct" and
// result in a single value or no match.
// If present, return cached entity otherwise execute query and cache result.
//
// 2) Otherwise the result is a set, possibly empty, of values that match.
// When within a UOW:
// If present, return the cached value(s) from the statement cache matching the query string.
// Otherwise, execute query and cache the result in the statement cache and update/merge the
// entites into the entity cache.
// NOTE: When we read data from the database we augment the select clause with TTL and write time
// stamps for all columns that record such information so as to be able to properlty expire
// and merge values in the cache.
//
// -- Update:
// Execute the database statement and then iff successs upsert the entity being updated into the
// entity cache.
//
// -- Insert/Upsert:
// Same as Update.
//
// -- Delete:
// Same as update, only remove the cached value from all caches on success.
//
// -- Count:
// If operating within a UOW lookup count in statement cache, if not present execute query and cache result.
//
if (delegate instanceof SelectOperation) {
SelectOperation<E> op = (SelectOperation<E>) delegate;
// Determine if we are caching and if so where.
AbstractCache<CacheKey, Set<E>> cache = delegate.getCache();
boolean prepareStatementForCaching = cache != null;
if (uow != null) {
prepareStatementForCaching = true;
cache = uow.<Set<E>>getCacheEnclosing(cache);
}
// The delegate will provide the cache key becuase it will either be:
// a) when distinct: the combination of the partition/cluster key columns
// b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
if (key != null && cache != null) {
Set<E> value = cache.get(key);
if (value != null) {
// Select will always return a Stream<E>
// TODO(gburd): SelectTransforming... apply fn here?
result = (E) value.stream();
if (cacheHitCounter != null) {
cacheHitCounter.inc();
}
if (log != null) {
log.info("cache hit");
}
return result;
} else {
if (cacheMissCounter != null) {
cacheMissCounter.inc();
}
if (log != null) {
log.info("cache miss");
}
}
}
}
if (cache != null) {
Object obj = delegate.unwrap(result);
if (obj != null) {
cache.put(key, obj);
}
delegate.<E>extract(result, key, cache);
}
}
}
// TODO: first, ask the delegate for the cacheKey
// if this is a SELECT query:
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
// if INSERT/UPSERT/UPDATE
// if DELETE
// if COUNT
----------------------------
@Override
public CacheKey getCacheKey() {
List<String>keys = new ArrayList<>(filters.size());
HelenusEntity entity = props.get(0).getEntity();
for (HelenusPropertyNode prop : props) {
switch(prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
Filter filter = filters.get(prop.getProperty());
if (filter != null) {
keys.add(filter.toString());
} else {
// we're missing a part of the primary key, so we can't create a proper cache key
return null;
}
break;
default:
// We've past the primary key components in this ordered list, so we're done building
// the cache key.
if (keys.size() > 0) {
return new CacheKey(entity, Joiner.on(",").join(keys));
}
return null;
}
}
return null;
}
---------------------------
// TODO(gburd): create a statement that matches one that wasn't prepared
//String key =
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
//for (Object param : params) {
// key = key.replaceFirst(Pattern.quote("?"), param.toString());
//}
------------------------
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class AbstractCache<K, V> {
final Logger logger = LoggerFactory.getLogger(getClass());
public Cache<K, V> cache;
public AbstractCache() {
RemovalListener<K, V> listener =
new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
}
V get(K key) {
return cache.getIfPresent(key);
}
void put(K key, V value) {
cache.put(key, value);
}
}
------------------------------------------------------------------------------------------------
cache entites (2 methods) marked @Cacheable
cache entites in txn context
cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem<E> that has an .evict() method
fix txn .andThen() chains
primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...)
create table wal {
id timeuuid,
follows timeuuid,
read <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
write <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
primary key (id, follows)
}
begin:
- insert into wal (timeuuid, parent timeuuid,
// NOTE: Update operations have no meaning when they only contain primary key components, so
// given that `properties` is ordered with the keys first if we find that the last element
// is either a partition key or clustering column then we know we should just skip this operation.
ColumnType ct = ((HelenusProperty) properties.toArray()[properties.size() - 1]).getColumnType();
if (ct != ColumnType.PARTITION_KEY && ct != ColumnType.CLUSTERING_COLUMN) {
return;
}
public Stream<E> sync() {
ListenableFuture<Stream<E>> future = async();
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String contents) {
//...process web site contents
}
@Override
public void onFailure(Throwable throwable) {
log.error("Exception in task", throwable);
}
});
}
-------
private mergeCache(Map<String, Set<Object>>
private static <E> Iterable<E> concat(
Iterable<? extends E> i1,
Iterable<? extends E> i2) {
return new Iterable<E>() {
public Iterator<E> iterator() {
return new Iterator<E>() {
Iterator<? extends E> listIterator = i1.iterator();
Boolean checkedHasNext;
E nextValue;
private boolean startTheSecond;
void theNext() {
if (listIterator.hasNext()) {
checkedHasNext = true;
nextValue = listIterator.next();
} else if (startTheSecond)
checkedHasNext = false;
else {
startTheSecond = true;
listIterator = i2.iterator();
theNext();
}
}
public boolean hasNext() {
if (checkedHasNext == null)
theNext();
return checkedHasNext;
}
public E next() {
if (!hasNext())
throw new NoSuchElementException();
checkedHasNext = null;
return nextValue;
}
public void remove() {
listIterator.remove();
}
};
}
};
}