380 lines
13 KiB
Text
380 lines
13 KiB
Text
|
|
|
|
|
|
|
|
|
|
--- Cache
|
|
// `E` is the type of the Entity class or one of:
|
|
// - ResultSet
|
|
// - ArrayTuple{N}
|
|
// - Count
|
|
// `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the
|
|
// Operation classes, it is going to be one of:
|
|
// - ResultSet
|
|
// - ArrayTuple{N}
|
|
// - or a type previously registered as a HelenusEntity.
|
|
// In the form of a:
|
|
// - Stream<?> or an
|
|
// - Optional<?>
|
|
//
|
|
// Operation/
|
|
// |-- AbstractStatementOperation
|
|
// | |-- AbstractOperation
|
|
// | | |-- AbstractFilterOperation
|
|
// | | | |-- CountOperation
|
|
// | | | |-- DeleteOperation
|
|
// | | | `-- UpdateOperation
|
|
// | | |-- BoundOperation
|
|
// | | `-- InsertOperation
|
|
// | |-- AbstractOptionalOperation
|
|
// | | |-- AbstractFilterOptionalOperation
|
|
// | | | |-- SelectFirstOperation
|
|
// | | | `-- SelectFirstTransformingOperation
|
|
// | | `-- BoundOptionalOperation
|
|
// | `-- AbstractStreamOperation
|
|
// | |-- AbstractFilterStreamOperation
|
|
// | | |-- SelectOperation
|
|
// | | `-- SelectTransformingOperation
|
|
// | `-- BoundStreamOperation
|
|
// |-- PreparedOperation
|
|
// |-- PreparedOptionalOperation
|
|
// `-- PreparedStreamOperation
|
|
//
|
|
// These all boil down to: Select, Update, Insert, Delete and Count
|
|
//
|
|
// -- Select:
|
|
// 1) Select statements that contain all primary key information will be "distinct" and
|
|
// result in a single value or no match.
|
|
// If present, return cached entity otherwise execute query and cache result.
|
|
//
|
|
// 2) Otherwise the result is a set, possibly empty, of values that match.
|
|
// When within a UOW:
|
|
// If present, return the cached value(s) from the statement cache matching the query string.
|
|
// Otherwise, execute query and cache the result in the statement cache and update/merge the
|
|
// entites into the entity cache.
|
|
// NOTE: When we read data from the database we augment the select clause with TTL and write time
|
|
// stamps for all columns that record such information so as to be able to properlty expire
|
|
// and merge values in the cache.
|
|
//
|
|
// -- Update:
|
|
// Execute the database statement and then iff successs upsert the entity being updated into the
|
|
// entity cache.
|
|
//
|
|
// -- Insert/Upsert:
|
|
// Same as Update.
|
|
//
|
|
// -- Delete:
|
|
// Same as update, only remove the cached value from all caches on success.
|
|
//
|
|
// -- Count:
|
|
// If operating within a UOW lookup count in statement cache, if not present execute query and cache result.
|
|
//
|
|
|
|
|
|
if (delegate instanceof SelectOperation) {
|
|
SelectOperation<E> op = (SelectOperation<E>) delegate;
|
|
|
|
// Determine if we are caching and if so where.
|
|
AbstractCache<CacheKey, Set<E>> cache = delegate.getCache();
|
|
boolean prepareStatementForCaching = cache != null;
|
|
if (uow != null) {
|
|
prepareStatementForCaching = true;
|
|
cache = uow.<Set<E>>getCacheEnclosing(cache);
|
|
}
|
|
|
|
// The delegate will provide the cache key becuase it will either be:
|
|
// a) when distinct: the combination of the partition/cluster key columns
|
|
// b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause
|
|
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
|
|
if (key != null && cache != null) {
|
|
Set<E> value = cache.get(key);
|
|
if (value != null) {
|
|
// Select will always return a Stream<E>
|
|
// TODO(gburd): SelectTransforming... apply fn here?
|
|
result = (E) value.stream();
|
|
if (cacheHitCounter != null) {
|
|
cacheHitCounter.inc();
|
|
}
|
|
if (log != null) {
|
|
log.info("cache hit");
|
|
}
|
|
return result;
|
|
} else {
|
|
if (cacheMissCounter != null) {
|
|
cacheMissCounter.inc();
|
|
}
|
|
if (log != null) {
|
|
log.info("cache miss");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (cache != null) {
|
|
Object obj = delegate.unwrap(result);
|
|
if (obj != null) {
|
|
cache.put(key, obj);
|
|
}
|
|
|
|
delegate.<E>extract(result, key, cache);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: first, ask the delegate for the cacheKey
|
|
// if this is a SELECT query:
|
|
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
|
|
// if INSERT/UPSERT/UPDATE
|
|
// if DELETE
|
|
// if COUNT
|
|
----------------------------
|
|
|
|
@Override
|
|
public CacheKey getCacheKey() {
|
|
|
|
List<String>keys = new ArrayList<>(filters.size());
|
|
HelenusEntity entity = props.get(0).getEntity();
|
|
|
|
for (HelenusPropertyNode prop : props) {
|
|
switch(prop.getProperty().getColumnType()) {
|
|
case PARTITION_KEY:
|
|
case CLUSTERING_COLUMN:
|
|
|
|
Filter filter = filters.get(prop.getProperty());
|
|
if (filter != null) {
|
|
keys.add(filter.toString());
|
|
} else {
|
|
// we're missing a part of the primary key, so we can't create a proper cache key
|
|
return null;
|
|
}
|
|
break;
|
|
default:
|
|
// We've past the primary key components in this ordered list, so we're done building
|
|
// the cache key.
|
|
if (keys.size() > 0) {
|
|
return new CacheKey(entity, Joiner.on(",").join(keys));
|
|
}
|
|
return null;
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
|
|
---------------------------
|
|
|
|
// TODO(gburd): create a statement that matches one that wasn't prepared
|
|
//String key =
|
|
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
|
|
//for (Object param : params) {
|
|
// key = key.replaceFirst(Pattern.quote("?"), param.toString());
|
|
//}
|
|
|
|
|
|
------------------------
|
|
package net.helenus.core.operation;
|
|
|
|
import com.datastax.driver.core.ResultSet;
|
|
import com.datastax.driver.core.ResultSetFuture;
|
|
import com.datastax.driver.core.Statement;
|
|
import com.google.common.cache.Cache;
|
|
import com.google.common.cache.CacheBuilder;
|
|
import com.google.common.cache.RemovalListener;
|
|
import com.google.common.cache.RemovalNotification;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import java.util.concurrent.ExecutionException;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
public abstract class AbstractCache<K, V> {
|
|
final Logger logger = LoggerFactory.getLogger(getClass());
|
|
public Cache<K, V> cache;
|
|
|
|
public AbstractCache() {
|
|
RemovalListener<K, V> listener =
|
|
new RemovalListener<K, V>() {
|
|
@Override
|
|
public void onRemoval(RemovalNotification<K, V> n) {
|
|
if (n.wasEvicted()) {
|
|
String cause = n.getCause().name();
|
|
logger.info(cause);
|
|
}
|
|
}
|
|
};
|
|
|
|
cache = CacheBuilder.newBuilder()
|
|
.maximumSize(10_000)
|
|
.expireAfterAccess(20, TimeUnit.MINUTES)
|
|
.weakKeys()
|
|
.softValues()
|
|
.removalListener(listener)
|
|
.build();
|
|
}
|
|
|
|
V get(K key) {
|
|
return cache.getIfPresent(key);
|
|
}
|
|
|
|
void put(K key, V value) {
|
|
cache.put(key, value);
|
|
}
|
|
}
|
|
|
|
------------------------------------------------------------------------------------------------
|
|
|
|
cache entites (2 methods) marked @Cacheable
|
|
cache entites in txn context
|
|
cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem<E> that has an .evict() method
|
|
fix txn .andThen() chains
|
|
|
|
|
|
|
|
|
|
primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...)
|
|
|
|
create table wal {
|
|
id timeuuid,
|
|
follows timeuuid,
|
|
read <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
|
|
write <Counting Quotient Filter, Set<{keyspace, col, schema generation, timestamp}>>
|
|
primary key (id, follows)
|
|
}
|
|
begin:
|
|
- insert into wal (timeuuid, parent timeuuid,
|
|
|
|
|
|
|
|
// NOTE: Update operations have no meaning when they only contain primary key components, so
|
|
// given that `properties` is ordered with the keys first if we find that the last element
|
|
// is either a partition key or clustering column then we know we should just skip this operation.
|
|
ColumnType ct = ((HelenusProperty) properties.toArray()[properties.size() - 1]).getColumnType();
|
|
if (ct != ColumnType.PARTITION_KEY && ct != ColumnType.CLUSTERING_COLUMN) {
|
|
return;
|
|
}
|
|
|
|
|
|
|
|
public Stream<E> sync() {
|
|
ListenableFuture<Stream<E>> future = async();
|
|
Futures.addCallback(future, new FutureCallback<String>() {
|
|
@Override
|
|
public void onSuccess(String contents) {
|
|
//...process web site contents
|
|
}
|
|
|
|
@Override
|
|
public void onFailure(Throwable throwable) {
|
|
log.error("Exception in task", throwable);
|
|
}
|
|
});
|
|
}
|
|
-------
|
|
private mergeCache(Map<String, Set<Object>>
|
|
|
|
private static <E> Iterable<E> concat(
|
|
Iterable<? extends E> i1,
|
|
Iterable<? extends E> i2) {
|
|
return new Iterable<E>() {
|
|
public Iterator<E> iterator() {
|
|
return new Iterator<E>() {
|
|
Iterator<? extends E> listIterator = i1.iterator();
|
|
Boolean checkedHasNext;
|
|
E nextValue;
|
|
private boolean startTheSecond;
|
|
|
|
void theNext() {
|
|
if (listIterator.hasNext()) {
|
|
checkedHasNext = true;
|
|
nextValue = listIterator.next();
|
|
} else if (startTheSecond)
|
|
checkedHasNext = false;
|
|
else {
|
|
startTheSecond = true;
|
|
listIterator = i2.iterator();
|
|
theNext();
|
|
}
|
|
}
|
|
|
|
public boolean hasNext() {
|
|
if (checkedHasNext == null)
|
|
theNext();
|
|
return checkedHasNext;
|
|
}
|
|
|
|
public E next() {
|
|
if (!hasNext())
|
|
throw new NoSuchElementException();
|
|
checkedHasNext = null;
|
|
return nextValue;
|
|
}
|
|
|
|
public void remove() {
|
|
listIterator.remove();
|
|
}
|
|
};
|
|
}
|
|
};
|
|
}
|
|
----------------------------------
|
|
if ("ttl".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
|
|
Getter getter = (Getter) args[0];
|
|
if (getter == null) {
|
|
return false;
|
|
}
|
|
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
|
|
String getterName = prop.getPropertyName();
|
|
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
|
|
if (src.containsKey(ttlKeyForProperty)) {
|
|
return src.get(ttlKeyForProperty);
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
if ("written".equals(methodName) && method.getParameterCount() == 1 && method.getReturnType() == int.class) {
|
|
Getter getter = (Getter) args[0];
|
|
if (getter == null) {
|
|
return false;
|
|
}
|
|
HelenusProperty prop = MappingUtil.resolveMappingProperty(getter).getProperty();
|
|
String getterName = prop.getPropertyName();
|
|
String ttlKeyForProperty = prop.getColumnName().toCql() + "_ttl";
|
|
if (src.containsKey(ttlKeyForProperty)) {
|
|
return src.get(ttlKeyForProperty);
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-----------------
|
|
|
|
public void setPurpose(String purpose) {
|
|
purpose_ = purpose;
|
|
}
|
|
|
|
public void logTimers(String what) {
|
|
LOG.info(String.format("UOW(%s) %s %s (total: %.3fµs db: %.3fµs or %2.2f%% of total time)",
|
|
hashCode(), purpose_, what,
|
|
elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0,
|
|
databaseTime_.elapsed(TimeUnit.MICROSECONDS) / 1000.0,
|
|
(elapsedTime_.elapsed(TimeUnit.MICROSECONDS) / databaseTime_.elapsed(TimeUnit.MICROSECONDS)) * 100.0));
|
|
}
|
|
|
|
--- postCommitFunction
|
|
elapsedTime_.stop();
|
|
if (purpose_ != null) {
|
|
logTimers("committed");
|
|
}
|
|
|
|
--- abort
|
|
elapsedTime_.stop();
|
|
if (purpose_ != null) {
|
|
logTimers("aborted");
|
|
}
|