Caching entity results within the context of a UOW (even nested) seems to be working.

This commit is contained in:
Greg Burd 2017-09-12 10:53:51 -04:00
parent 1b46ee0ed1
commit 58b29ad181
33 changed files with 701 additions and 499 deletions

269
NOTES
View file

@ -1,4 +1,226 @@
--- Cache
// `E` is the type of the Entity class or one of:
// - ResultSet
// - ArrayTuple{N}
// - Count
// `F` is the type argument passed to us from HelenusSession DSL and carried on via one of the
// Operation classes, it is going to be one of:
// - ResultSet
// - ArrayTuple{N}
// - or a type previously registered as a HelenusEntity.
// In the form of a:
// - Stream<?> or an
// - Optional<?>
//
// Operation/
// |-- AbstractStatementOperation
// | |-- AbstractOperation
// | | |-- AbstractFilterOperation
// | | | |-- CountOperation
// | | | |-- DeleteOperation
// | | | `-- UpdateOperation
// | | |-- BoundOperation
// | | `-- InsertOperation
// | |-- AbstractOptionalOperation
// | | |-- AbstractFilterOptionalOperation
// | | | |-- SelectFirstOperation
// | | | `-- SelectFirstTransformingOperation
// | | `-- BoundOptionalOperation
// | `-- AbstractStreamOperation
// | |-- AbstractFilterStreamOperation
// | | |-- SelectOperation
// | | `-- SelectTransformingOperation
// | `-- BoundStreamOperation
// |-- PreparedOperation
// |-- PreparedOptionalOperation
// `-- PreparedStreamOperation
//
// These all boil down to: Select, Update, Insert, Delete and Count
//
// -- Select:
// 1) Select statements that contain all primary key information will be "distinct" and
// result in a single value or no match.
// If present, return cached entity otherwise execute query and cache result.
//
// 2) Otherwise the result is a set, possibly empty, of values that match.
// When within a UOW:
// If present, return the cached value(s) from the statement cache matching the query string.
// Otherwise, execute query and cache the result in the statement cache and update/merge the
// entites into the entity cache.
// NOTE: When we read data from the database we augment the select clause with TTL and write time
// stamps for all columns that record such information so as to be able to properlty expire
// and merge values in the cache.
//
// -- Update:
// Execute the database statement and then iff successs upsert the entity being updated into the
// entity cache.
//
// -- Insert/Upsert:
// Same as Update.
//
// -- Delete:
// Same as update, only remove the cached value from all caches on success.
//
// -- Count:
// If operating within a UOW lookup count in statement cache, if not present execute query and cache result.
//
if (delegate instanceof SelectOperation) {
SelectOperation<E> op = (SelectOperation<E>) delegate;
// Determine if we are caching and if so where.
AbstractCache<CacheKey, Set<E>> cache = delegate.getCache();
boolean prepareStatementForCaching = cache != null;
if (uow != null) {
prepareStatementForCaching = true;
cache = uow.<Set<E>>getCacheEnclosing(cache);
}
// The delegate will provide the cache key becuase it will either be:
// a) when distinct: the combination of the partition/cluster key columns
// b) otherwise: the table name followed by the portion of the SQL statement that would form the WHERE clause
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
if (key != null && cache != null) {
Set<E> value = cache.get(key);
if (value != null) {
// Select will always return a Stream<E>
// TODO(gburd): SelectTransforming... apply fn here?
result = (E) value.stream();
if (cacheHitCounter != null) {
cacheHitCounter.inc();
}
if (log != null) {
log.info("cache hit");
}
return result;
} else {
if (cacheMissCounter != null) {
cacheMissCounter.inc();
}
if (log != null) {
log.info("cache miss");
}
}
}
}
if (cache != null) {
Object obj = delegate.unwrap(result);
if (obj != null) {
cache.put(key, obj);
}
delegate.<E>extract(result, key, cache);
}
}
}
// TODO: first, ask the delegate for the cacheKey
// if this is a SELECT query:
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
// if INSERT/UPSERT/UPDATE
// if DELETE
// if COUNT
----------------------------
@Override
public CacheKey getCacheKey() {
List<String>keys = new ArrayList<>(filters.size());
HelenusEntity entity = props.get(0).getEntity();
for (HelenusPropertyNode prop : props) {
switch(prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
Filter filter = filters.get(prop.getProperty());
if (filter != null) {
keys.add(filter.toString());
} else {
// we're missing a part of the primary key, so we can't create a proper cache key
return null;
}
break;
default:
// We've past the primary key components in this ordered list, so we're done building
// the cache key.
if (keys.size() > 0) {
return new CacheKey(entity, Joiner.on(",").join(keys));
}
return null;
}
}
return null;
}
---------------------------
// TODO(gburd): create a statement that matches one that wasn't prepared
//String key =
// "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
//for (Object param : params) {
// key = key.replaceFirst(Pattern.quote("?"), param.toString());
//}
------------------------
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class AbstractCache<K, V> {
final Logger logger = LoggerFactory.getLogger(getClass());
public Cache<K, V> cache;
public AbstractCache() {
RemovalListener<K, V> listener =
new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
}
V get(K key) {
return cache.getIfPresent(key);
}
void put(K key, V value) {
cache.put(key, value);
}
}
------------------------------------------------------------------------------------------------
cache entites (2 methods) marked @Cacheable
cache entites in txn context
@ -46,3 +268,50 @@ begin:
}
});
}
-------
private mergeCache(Map<String, Set<Object>>
private static <E> Iterable<E> concat(
Iterable<? extends E> i1,
Iterable<? extends E> i2) {
return new Iterable<E>() {
public Iterator<E> iterator() {
return new Iterator<E>() {
Iterator<? extends E> listIterator = i1.iterator();
Boolean checkedHasNext;
E nextValue;
private boolean startTheSecond;
void theNext() {
if (listIterator.hasNext()) {
checkedHasNext = true;
nextValue = listIterator.next();
} else if (startTheSecond)
checkedHasNext = false;
else {
startTheSecond = true;
listIterator = i2.iterator();
theNext();
}
}
public boolean hasNext() {
if (checkedHasNext == null)
theNext();
return checkedHasNext;
}
public E next() {
if (!hasNext())
throw new NoSuchElementException();
checkedHasNext = null;
return nextValue;
}
public void remove() {
listIterator.remove();
}
};
}
};
}

View file

@ -26,8 +26,8 @@ public abstract class AbstractAuditedEntityDraft<E> extends AbstractEntityDraft<
set("modifiedAt", now);
}
Date createdAt() {
return (Date) get("createdAt");
public Date createdAt() {
return (Date) get("createdAt", Date.class);
}
}

View file

@ -27,29 +27,6 @@ public abstract class AbstractEntityDraft<E> implements Drafted<E> {
public E build() { return Helenus.map(getEntityClass(), toMap()); }
@SuppressWarnings("unchecked")
protected <T> T get(String key) {
T value = (T) entityMap.get(key);
if (value == null) {
T obj = (T) new Object() {
};
Class<?> primitiveType = Primitives.unwrap(obj.getClass());
if (Primitives.allPrimitiveTypes().contains(primitiveType)) {
DefaultPrimitiveTypes type = DefaultPrimitiveTypes.lookup(primitiveType);
if (type == null) {
throw new RuntimeException("unknown primitive type " + primitiveType.getTypeName());
}
return (T) type.getDefaultValue();
}
}
return value;
}
@SuppressWarnings("unchecked")
protected <T> T get(String key, Class<?> returnType) {
T value = (T) entityMap.get(key);

View file

@ -23,7 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.PrintStream;
import java.util.concurrent.Executor;
import net.helenus.core.operation.SessionCache;
import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException;
@ -124,6 +123,4 @@ public abstract class AbstractSessionOperations {
getPrintStream().println(cql);
}
public SessionCache getSessionCache() { return null; }
}

View file

@ -16,6 +16,7 @@
package net.helenus.core;
import brave.Tracer;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.*;
import net.helenus.core.operation.*;
@ -62,7 +63,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata;
private final SessionCache sessionCache;
HelenusSession(
Session session,
@ -93,7 +94,6 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata();
this.sessionCache = new SessionCache();
}
@Override
@ -568,6 +568,4 @@ public final class HelenusSession extends AbstractSessionOperations implements C
}
}
public SessionCache getSessionCache() { return sessionCache; }
}

View file

@ -39,7 +39,7 @@ public final class SessionInitializer extends AbstractSessionOperations {
private String usingKeyspace;
private boolean showCql = false;
private ConsistencyLevel consistencyLevel;
private MetricRegistry metricRegistry;
private MetricRegistry metricRegistry = new MetricRegistry();
private Tracer zipkinTracer;
private PrintStream printStream = System.out;
private Executor executor = MoreExecutors.directExecutor();

View file

@ -1,14 +1,9 @@
package net.helenus.core;
import com.datastax.driver.core.ResultSet;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.operation.AbstractCache;
import net.helenus.core.operation.CacheKey;
import net.helenus.core.operation.UnitOfWorkCache;
import net.helenus.support.HelenusException;
import java.io.IOException;
import java.util.*;
@ -18,7 +13,7 @@ public final class UnitOfWork implements AutoCloseable {
private final HelenusSession session;
private final UnitOfWork parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<CacheKey, ResultSet> cache = new HashMap<>();
private final Map<String, Set<Object>> cache = new HashMap<String, Set<Object>>();
private boolean aborted = false;
private boolean committed = false;
@ -47,10 +42,6 @@ public final class UnitOfWork implements AutoCloseable {
return this;
}
public UnitOfWorkCache getCacheEnclosing(AbstractCache cache) {
return new UnitOfWorkCache(this, cache);
}
private void applyPostCommitFunctions() {
if (!postCommit.isEmpty()) {
for (CommitThunk f : postCommit) {
@ -59,9 +50,19 @@ public final class UnitOfWork implements AutoCloseable {
}
}
public UnitOfWork getEnclosingUnitOfWork() { return parent; }
public Set<Object> cacheLookup(String key) {
UnitOfWork p = this;
do {
Set<Object> r = p.getCache().get(key);
if (r != null) {
return r;
}
p = parent;
} while(p != null);
return null;
}
public Map<CacheKey, ResultSet> getCache() { return cache; }
public Map<String, Set<Object>> getCache() { return cache; }
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
@ -96,6 +97,21 @@ public final class UnitOfWork implements AutoCloseable {
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Merge UOW cache into parent's cache.
if (parent != null) {
Map<String, Set<Object>> parentCache = parent.getCache();
for (String key : cache.keySet()) {
if (parentCache.containsKey(key)) {
// merge the sets
Set<Object> ps = parentCache.get(key);
ps.addAll(cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is.
} else {
// add the missing set
parentCache.put(key, cache.get(key));
}
}
}
// Apply all post-commit functions for
if (parent == null) {
traverser.postOrderTraversal(this).forEach(uow -> {
@ -111,7 +127,7 @@ public final class UnitOfWork implements AutoCloseable {
abort();
}
/** Explicitly discard the work and mark it as as such in the log. */
/* Explicitly discard the work and mark it as as such in the log. */
public void abort() {
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> {

View file

@ -1,48 +0,0 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class AbstractCache<K, V> {
final Logger logger = LoggerFactory.getLogger(getClass());
public Cache<K, V> cache;
public AbstractCache() {
RemovalListener<K, V> listener =
new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
}
V get(K key) {
return cache.getIfPresent(key);
}
void put(K key, V value) {
cache.put(key, value);
}
}

View file

@ -23,8 +23,7 @@ import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterOptionalOperation<
E, O extends AbstractFilterOptionalOperation<E, O>>
public abstract class AbstractFilterOptionalOperation<E, O extends AbstractFilterOptionalOperation<E, O>>
extends AbstractOptionalOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null;

View file

@ -23,8 +23,7 @@ import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
public abstract class AbstractFilterStreamOperation<E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> {
protected Map<HelenusProperty, Filter<?>> filters = null;

View file

@ -15,28 +15,23 @@
*/
package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<E> {
extends AbstractStatementOperation<E, O> {
public abstract E transform(ResultSet resultSet);
public AbstractCache getCache() {
return null;
}
public boolean cacheable() {
return false;
}
public CacheKey getCacheKey() {
return null;
}
public AbstractOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
@ -45,20 +40,37 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
return new PreparedOperation<E>(prepareStatement(), this);
}
public E sync() {
return Executioner.INSTANCE.<E>sync(sessionOps, null, traceContext, this, showValues);
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false);
return transform(resultSet);
} finally {
context.stop();
}
}
public E sync(UnitOfWork uow) {
return Executioner.INSTANCE.<E>sync(sessionOps, uow, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true);
E result = transform(resultSet);
return result;
} finally {
context.stop();
}
}
public CompletableFuture<E> async() {
AbstractCache cache = getCache();
boolean cacheResult = cache != null;
return Executioner.INSTANCE.<E>async(sessionOps, null, traceContext, this, showValues);
return CompletableFuture.<E>supplyAsync(() -> sync());
}
public CompletableFuture<E> async(UnitOfWork uow) {
return Executioner.INSTANCE.<E>async(sessionOps, uow, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
return CompletableFuture.<E>supplyAsync(() -> sync(uow));
}
}

View file

@ -15,27 +15,24 @@
*/
package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Optional<E>> {
private Function<Optional<E>, E> extractor = new Function<Optional<E>, E>() {
@Override
public E apply(Optional<E> e) {
return e.orElse(null);
}
};
extends AbstractStatementOperation<E, O> {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -43,12 +40,6 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public abstract Optional<E> transform(ResultSet resultSet);
public AbstractCache getCache() { return null; }
public CacheKey getCacheKey() {
return null;
}
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}
@ -66,19 +57,63 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
public Optional<E> sync() {
return Executioner.INSTANCE.<Optional<E>>sync(sessionOps, null, extractor, traceContext, this, showValues);
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false);
return transform(resultSet);
} finally {
context.stop();
}
}
public Optional<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>sync(sessionOps, uow, extractor, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
final Timer.Context context = requestLatency.time();
try {
Optional<E> result = null;
String key = getStatementCacheKey();
if (key != null) {
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
if (cachedResult != null) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
uowCacheHits.mark();
logger.info("UOW({}) cache hit, {} -> {}", uow.hashCode(), key, cachedResult.toString());
result = cachedResult.stream().findFirst();
}
}
if (result == null) {
uowCacheMiss.mark();
ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true);
result = transform(resultSet);
if (key != null) {
if (result.isPresent()) {
Set<Object> set = new HashSet<Object>(1);
set.add(result.get());
uow.getCache().put(key, set);
} else {
uow.getCache().put(key, new HashSet<Object>(0));
}
}
}
return result;
} finally {
context.stop();
}
}
public CompletableFuture<Optional<E>> async() {
return Executioner.INSTANCE.<Optional<E>>async(sessionOps, null, extractor, traceContext, this, showValues);
return CompletableFuture.<Optional<E>>supplyAsync(() -> sync());
}
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>async(sessionOps, uow, extractor, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
return CompletableFuture.<Optional<E>>supplyAsync(() -> sync(uow));
}
}

View file

@ -32,12 +32,10 @@ import net.helenus.support.HelenusException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> {
public abstract class AbstractStatementOperation<E, O extends AbstractStatementOperation<E, O>> extends Operation<E> {
final Logger logger = LoggerFactory.getLogger(getClass());
protected final AbstractSessionOperations sessionOps;
public abstract Statement buildStatement(boolean cached);
protected boolean showValues = true;
@ -50,7 +48,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
private int[] fetchSize = null;
public AbstractStatementOperation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations;
super(sessionOperations);
this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel();
}

View file

@ -15,18 +15,29 @@
*/
package net.helenus.core.operation;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.mapping.value.ValueProviderMap;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Stream<E>> {
extends AbstractStatementOperation<E, O> {
public AbstractStreamOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -34,14 +45,6 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public abstract Stream<E> transform(ResultSet resultSet);
public AbstractCache getCache() {
return null;
}
public CacheKey getCacheKey() {
return null;
}
public PreparedStreamOperation<E> prepare() {
return new PreparedStreamOperation<E>(prepareStatement(), this);
}
@ -59,18 +62,56 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
public Stream<E> sync() {
return Executioner.INSTANCE.<Stream<E>>sync(sessionOps, null, traceContext, this, showValues);
final Timer.Context context = requestLatency.time();
try {
ResultSet resultSet = this.execute(sessionOps, null, traceContext, showValues, false);
return transform(resultSet);
} finally {
context.stop();
}
}
public Stream<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>sync(sessionOps, uow, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
final Timer.Context context = requestLatency.time();
try {
Stream<E> result = null;
String key = getStatementCacheKey();
if (key != null) {
Set<E> cachedResult = (Set<E>) uow.cacheLookup(key);
if (cachedResult != null) {
//TODO(gburd): what about select ResultSet, Tuple... etc.?
uowCacheHits.mark();
logger.info("UOW({}) cache hit, {} -> {}", uow.hashCode(), key, cachedResult.toString());
result = cachedResult.stream();
}
}
if (result == null) {
uowCacheMiss.mark();
ResultSet resultSet = execute(sessionOps, uow, traceContext, showValues, true);
result = transform(resultSet);
if (key != null) {
uow.getCache().put(key, (Set<Object>) result);
}
}
return result;
} finally {
context.stop();
}
}
public CompletableFuture<Stream<E>> async() {
return Executioner.INSTANCE.<Stream<E>>async(sessionOps, null, traceContext, this, showValues);
return CompletableFuture.<Stream<E>>supplyAsync(() -> sync());
}
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>async(sessionOps, uow, traceContext, this, showValues);
Objects.requireNonNull(uow, "Unit of Work should not be null.");
return CompletableFuture.<Stream<E>>supplyAsync(() -> sync(uow));
}
}

View file

@ -34,19 +34,7 @@ public final class BoundOptionalOperation<E>
}
@Override
public Optional<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet);
}
@Override
public AbstractCache getCache() {
return delegate.getCache();
}
@Override
public CacheKey getCacheKey() {
return delegate.getCacheKey();
}
public Optional<E> transform(ResultSet resultSet) { return delegate.transform(resultSet); }
@Override
public Statement buildStatement(boolean cached) {

View file

@ -25,20 +25,15 @@ public final class BoundStreamOperation<E>
private final BoundStatement boundStatement;
private final AbstractStreamOperation<E, ?> delegate;
private final CacheKey cacheKey;
public BoundStreamOperation(
BoundStatement boundStatement, CacheKey cacheKey, AbstractStreamOperation<E, ?> operation) {
public BoundStreamOperation(BoundStatement boundStatement, AbstractStreamOperation<E, ?> operation) {
super(operation.sessionOps);
this.boundStatement = boundStatement;
this.cacheKey = cacheKey;
this.delegate = operation;
}
@Override
public AbstractCache getCache() {
return delegate.getCache();
}
public String getStatementCacheKey() { return delegate.getStatementCacheKey(); }
@Override
public Stream<E> transform(ResultSet resultSet) {
@ -46,12 +41,5 @@ public final class BoundStreamOperation<E>
}
@Override
public CacheKey getCacheKey() {
return cacheKey;
}
@Override
public Statement buildStatement(boolean cached) {
return boundStatement;
}
public Statement buildStatement(boolean cached) { return boundStatement; }
}

View file

@ -1,8 +0,0 @@
package net.helenus.core.operation;
import java.io.Serializable;
public interface CachableOperation {
public <T extends Serializable> T getCacheKey();
public <T extends Serializable> T valueToCache();
}

View file

@ -1,43 +0,0 @@
package net.helenus.core.operation;
import com.datastax.driver.core.Statement;
import net.helenus.mapping.HelenusEntity;
import java.io.Serializable;
public class CacheKey implements Serializable {
private String key;
private HelenusEntity entity;
CacheKey() {}
CacheKey(HelenusEntity entity, String key) {
this.entity = entity;
this.key = key;
}
public void set(String key) {
this.key = key;
}
public String toString() {
return entity.getName() + "." + key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return key.equals(cacheKey.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
}

View file

@ -1,105 +0,0 @@
package net.helenus.core.operation;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public enum Executioner {
INSTANCE;
<E> E sync(AbstractSessionOperations session, UnitOfWork uow, Function<E, ?> extractor,
TraceContext traceContext, OperationsDelegate<E> delegate, boolean showValues) {
return this.<E>execute(session, uow, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext,
OperationsDelegate<E> delegate, boolean showValues) {
return CompletableFuture.<E>supplyAsync(() -> execute(session, uow, traceContext, delegate, showValues));
}
public <E> E execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext,
OperationsDelegate<E> delegate, boolean showValues) {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
// Determine if we are caching and if so where to put the results.
AbstractCache<CacheKey, Object> cache = null;
boolean prepareStatementForCaching = false;
if (uow != null ) {
prepareStatementForCaching = true;
cache = uow.getCacheEnclosing(delegate.getCache());
} else {
cache = delegate.getCache();
prepareStatementForCaching = cache != null;
}
// TODO: first, ask the delegate for the cacheKey
// if this is a SELECT query:
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
// if INSERT/UPSERT/UPDATE
// if DELETE
// if COUNT
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
E result = null;
if (key != null && cache != null) {
// Right now we only support Optional<Entity> fetch via complete primary key.
Object value = cache.get(key);
if (value != null) {
result = (E) Optional.of(value);
// if log statements: log cache hit for entity, primary key
// metrics.cacheHit +1
} else {
// if log statements: log cache miss for entity, primary key
// metrics.cacheMiss +1
}
}
ResultSet resultSet = null;
if (result == null) {
Statement statement = delegate.options(delegate.buildStatement(prepareStatementForCaching));
// if log statements... log it here.
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
resultSet = futureResultSet.get();
}
result = delegate.transform(resultSet);
if (cache != null) {
updateCache.apply(result, cache);
}
return (E) result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
}
}

View file

@ -21,9 +21,12 @@ import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import java.util.*;
import java.util.function.Function;
import com.google.common.base.Joiner;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.Getter;
import net.helenus.core.Helenus;
import net.helenus.core.UnitOfWork;
import net.helenus.core.reflect.DefaultPrimitiveTypes;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
@ -224,4 +227,38 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
+ p.getEntity().getMappingInterface());
}
}
@Override
public String getStatementCacheKey() {
List<String> keys = new ArrayList<>(values.size());
values.forEach(
t -> {
HelenusPropertyNode prop = t._1;
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
keys.add(prop.getColumnName() + "==" + t._2.toString());
break;
default:
break;
}
});
return entity.getName() + ": " + Joiner.on(",").join(keys);
}
@Override
public T sync(UnitOfWork uow) {
T result = super.sync(uow);
Class<?> iface = entity.getMappingInterface();
if (resultType == iface) {
String key = getStatementCacheKey();
if (key != null) {
Set<Object> set = new HashSet<Object>(1);
set.add(result);
uow.getCache().put(key, set);
}
}
return result;
}
}

View file

@ -0,0 +1,73 @@
package net.helenus.core.operation;
import java.util.concurrent.ExecutionException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import brave.Span;
import brave.Tracer;
import brave.propagation.TraceContext;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class Operation<E> {
protected final AbstractSessionOperations sessionOps;
protected final Meter uowCacheHits;
protected final Meter uowCacheMiss;
protected final Timer requestLatency;
Operation(AbstractSessionOperations sessionOperations) {
this.sessionOps = sessionOperations;
MetricRegistry metrics = sessionOperations.getMetricRegistry();
this.uowCacheHits = metrics.meter("helenus.UOW-cache-hits");
this.uowCacheMiss = metrics.meter("helenus.UOW-cache-miss");
this.requestLatency = metrics.timer("helenus.request-latency");
}
public ResultSet execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, boolean showValues, boolean cached) {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
Statement statement = options(buildStatement(cached));
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return futureResultSet.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
}
public Statement options(Statement statement) { return statement; }
public Statement buildStatement(boolean cached) { return null; }
public String getStatementCacheKey() { return null; }
}

View file

@ -1,18 +0,0 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
interface OperationsDelegate<E> {
Statement buildStatement(boolean cached);
Statement options(Statement statement);
E transform(ResultSet resultSet);
AbstractCache getCache();
CacheKey getCacheKey();
}

View file

@ -35,16 +35,8 @@ public final class PreparedStreamOperation<E> {
}
public BoundStreamOperation<E> bind(Object... params) {
BoundStatement boundStatement = preparedStatement.bind(params);
String key =
"use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
for (Object param : params) {
key = key.replaceFirst(Pattern.quote("?"), param.toString());
}
return new BoundStreamOperation<E>(boundStatement, operation.getCacheKey(), operation);
return new BoundStreamOperation<E>(boundStatement, operation);
}
@Override

View file

@ -23,37 +23,30 @@ import java.util.function.Function;
public final class SelectFirstOperation<E>
extends AbstractFilterOptionalOperation<E, SelectFirstOperation<E>> {
private final SelectOperation<E> src;
private final SelectOperation<E> delegate;
public SelectFirstOperation(SelectOperation<E> src) {
super(src.sessionOps);
public SelectFirstOperation(SelectOperation<E> delegate) {
super(delegate.sessionOps);
this.src = src;
this.filters = src.filters;
this.ifFilters = src.ifFilters;
this.delegate = delegate;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
public <R> SelectFirstTransformingOperation<R, E> map(Function<E, R> fn) {
return new SelectFirstTransformingOperation<R, E>(src, fn);
return new SelectFirstTransformingOperation<R, E>(delegate, fn);
}
@Override
public AbstractCache getCache() {
return src.getCache();
}
@Override
public CacheKey getCacheKey() {
return src.getCacheKey();
}
public String getStatementCacheKey() { return delegate.getStatementCacheKey(); }
@Override
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
return delegate.buildStatement(cached);
}
@Override
public Optional<E> transform(ResultSet resultSet) {
return src.transform(resultSet).findFirst();
return delegate.transform(resultSet).findFirst();
}
}

View file

@ -23,25 +23,28 @@ import java.util.function.Function;
public final class SelectFirstTransformingOperation<R, E>
extends AbstractFilterOptionalOperation<R, SelectFirstTransformingOperation<R, E>> {
private final SelectOperation<E> src;
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectFirstTransformingOperation(SelectOperation<E> src, Function<E, R> fn) {
super(src.sessionOps);
public SelectFirstTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.src = src;
this.delegate = delegate;
this.fn = fn;
this.filters = src.filters;
this.ifFilters = src.ifFilters;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public String getStatementCacheKey() { return delegate.getStatementCacheKey(); }
@Override
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
return delegate.buildStatement(cached);
}
@Override
public Optional<R> transform(ResultSet resultSet) {
return src.transform(resultSet).findFirst().map(fn);
return delegate.transform(resultSet).findFirst().map(fn);
}
}

View file

@ -48,7 +48,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
protected List<Ordering> ordering = null;
protected Integer limit = null;
protected boolean allowFiltering = false;
protected boolean ignoreSessionCache = false;
protected boolean cacheEntity = false;
public SelectOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -77,6 +77,8 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
super(sessionOperations);
cacheEntity = entity.isCacheable();
entity
.getOrderedProperties()
.stream()
@ -92,6 +94,8 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
super(sessionOperations);
this.rowMapper = rowMapper;
cacheEntity = entity.isCacheable();
entity
.getOrderedProperties()
.stream()
@ -105,6 +109,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
HelenusPropertyNode... props) {
super(sessionOperations);
this.rowMapper = rowMapper;
Collections.addAll(this.props, props);
}
@ -170,7 +175,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
public SelectOperation<E> ignoreCache() {
ignoreSessionCache = true;
cacheEntity = false;
return this;
}
@ -185,21 +190,12 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
@Override
public AbstractCache getCache() {// TODO, not really public API...
if (!ignoreSessionCache) {
return sessionOps.getSessionCache();
}
return null;
}
@Override
public CacheKey getCacheKey() {
List<String>keys = new ArrayList<>(filters.size());
public String getStatementCacheKey() {
List<String> keys = new ArrayList<>(filters.size());
HelenusEntity entity = props.get(0).getEntity();
for (HelenusPropertyNode prop : props) {
switch(prop.getProperty().getColumnType()) {
switch (prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
@ -207,15 +203,12 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
if (filter != null) {
keys.add(filter.toString());
} else {
// we're missing a part of the primary key, so we can't create a proper cache key
return null;
keys.add(prop.getColumnName() + "==?");
}
break;
default:
// We've past the primary key components in this ordered list, so we're done building
// the cache key.
if (keys.size() > 0) {
return new CacheKey(entity, Joiner.on(",").join(keys));
return entity.getName() + ": " + Joiner.on(",").join(keys);
}
return null;
}
@ -305,18 +298,11 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
@SuppressWarnings("unchecked")
@Override
public Stream<E> transform(ResultSet resultSet) {
if (rowMapper != null) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), false)
.map(rowMapper);
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), false).map(rowMapper);
} else {
return (Stream<E>)
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),
false);
StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),false);
}
}

View file

@ -23,30 +23,29 @@ import java.util.stream.Stream;
public final class SelectTransformingOperation<R, E>
extends AbstractFilterStreamOperation<R, SelectTransformingOperation<R, E>> {
private final SelectOperation<E> src;
private final SelectOperation<E> delegate;
private final Function<E, R> fn;
public SelectTransformingOperation(SelectOperation<E> src, Function<E, R> fn) {
super(src.sessionOps);
public SelectTransformingOperation(SelectOperation<E> delegate, Function<E, R> fn) {
super(delegate.sessionOps);
this.src = src;
this.delegate = delegate;
this.fn = fn;
this.filters = src.filters;
this.ifFilters = src.ifFilters;
this.filters = delegate.filters;
this.ifFilters = delegate.ifFilters;
}
@Override
public String getStatementCacheKey() { return delegate.getStatementCacheKey(); }
@Override
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
}
@Override
public AbstractCache getCache() {
return src.getCache();
return delegate.buildStatement(cached);
}
@Override
public Stream<R> transform(ResultSet resultSet) {
return src.transform(resultSet).map(fn);
return delegate.transform(resultSet).map(fn);
}
}

View file

@ -1,25 +0,0 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import java.util.concurrent.ExecutionException;
public class SessionCache extends AbstractCache<CacheKey, ResultSet> {
protected ResultSet apply(CacheKey key, OperationsDelegate delegate)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
resultSet = cache.getIfPresent(key);
if (resultSet != null) {
cache.put(key, resultSet);
}
return resultSet;
}
}

View file

@ -1,37 +0,0 @@
package net.helenus.core.operation;
import java.util.Optional;
import net.helenus.core.UnitOfWork;
public class UnitOfWorkCache extends AbstractCache<CacheKey, Object> {
private final UnitOfWork uow;
private AbstractCache<CacheKey, Object> sessionCache;
public UnitOfWorkCache(UnitOfWork uow, AbstractCache sessionCache) {
super();
this.sessionCache = sessionCache;
this.uow = uow;
}
@Override
Object get(CacheKey key) {
Object result = null;
UnitOfWork parent = null;
do {
result = uow.getCache().get(key);
parent = uow.getEnclosingUnitOfWork();
} while(result == null && parent != null);
if (result == null) {
result = sessionCache.get(key);
}
return result;
}
@Override
void put(CacheKey key, Object result) {
cache.put(key, result);
}
}

View file

@ -77,7 +77,8 @@ public abstract class AbstractEmbeddedCassandraTest {
String cql =
"CREATE KEYSPACE "
+ keyspace
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};";
+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}"
+ " AND DURABLE_WRITES = false;";
System.out.println(cql + "\n");
session.execute(cql);

View file

@ -38,7 +38,7 @@ public interface Inventory {
// Immutable properties:
public UUID id() {
return this.<UUID>get("id");
return this.<UUID>get("id", UUID.class);
}
public long EMEA() {

View file

@ -48,16 +48,16 @@ public interface Supply {
// Immutable properties:
public UUID id() {
return this.<UUID>get("id");
return this.<UUID>get("id", UUID.class);
}
public String region() {
return this.<String>get("region");
return this.<String>get("region", String.class);
}
// Mutable properties:
public String code() {
return this.<String>get("code");
return this.<String>get("code", String.class);
}
public Draft code(String code) {
@ -70,7 +70,7 @@ public interface Supply {
}
public String description() {
return this.<String>get("description");
return this.<String>get("description", String.class);
}
public Draft description(String description) {
@ -83,7 +83,7 @@ public interface Supply {
}
public Map<String, Long> demand() {
return this.<Map<String, Long>>get("demand");
return this.<Map<String, Long>>get("demand", Map.class);
}
public Draft demand(Map<String, Long> demand) {
@ -96,7 +96,7 @@ public interface Supply {
}
public List<String> suppliers() {
return this.<List<String>>get("suppliers");
return this.<List<String>>get("suppliers", List.class);
}
public Draft suppliers(List<String> suppliers) {
@ -109,7 +109,7 @@ public interface Supply {
}
public Set<String> shipments() {
return this.<Set<String>>get("shipments");
return this.<Set<String>>get("shipments", Set.class);
}
public Draft shipments(Set<String> shipments) {

View file

@ -60,6 +60,91 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
widget = session.dsl(Widget.class);
}
@Test
public void testSelectAfterSelect() throws Exception {
Widget w1, w2;
UUID key = UUIDs.timeBased();
// This should inserted Widget, but not cache it.
session.<Widget>insert(widget)
.value(widget::id, key)
.value(widget::name, RandomString.make(20))
.sync();
try (UnitOfWork uow = session.begin()) {
// This should read from the database and return a Widget.
w1 = session.<Widget>select(widget)
.where(widget::id, eq(key))
.single()
.sync(uow)
.orElse(null);
// This should read from the cache and get the same instance of a Widget.
w2 = session.<Widget>select(widget)
.where(widget::id, eq(key))
.single()
.sync(uow)
.orElse(null);
uow.commit()
.andThen(() -> {
Assert.assertEquals(w1, w2);
});
}
}
@Test
public void testSelectAfterNestedSelect() throws Exception {
Widget w1, w2, w3, w4;
UUID key1 = UUIDs.timeBased();
UUID key2 = UUIDs.timeBased();
// This should inserted Widget, and not cache it in uow1.
try (UnitOfWork uow1 = session.begin()) {
w1 = session.<Widget>insert(widget)
.value(widget::id, key1)
.value(widget::name, RandomString.make(20))
.sync(uow1);
try (UnitOfWork uow2 = session.begin(uow1)) {
// This should read from uow1's cache and return the same Widget.
w2 = session.<Widget>select(widget)
.where(widget::id, eq(key1))
.single()
.sync(uow2)
.orElse(null);
Assert.assertEquals(w1, w2);
w3 = session.<Widget>insert(widget)
.value(widget::id, key2)
.value(widget::name, RandomString.make(20))
.sync(uow2);
uow2.commit()
.andThen(() -> {
Assert.assertEquals(w1, w2);
});
}
// This should read from the cache and get the same instance of a Widget.
w4 = session.<Widget>select(widget)
.where(widget::id, eq(key2))
.single()
.sync(uow1)
.orElse(null);
uow1.commit()
.andThen(() -> {
Assert.assertEquals(w3, w4);
});
}
}
/*
@Test
public void testSelectAfterInsertProperlyCachesEntity() throws Exception {
Widget w1, w2, w3, w4;
@ -106,5 +191,5 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertNotEquals(w1, w4);
Assert.assertTrue(w1.equals(w4));
}
*/
}