WIP: still working toward a solid caching scheme, progress but far from done.

This commit is contained in:
Greg Burd 2017-08-18 16:44:30 -04:00
parent 28aa3b1bae
commit 933526b05b
15 changed files with 192 additions and 126 deletions

10
NOTES
View file

@ -1,3 +1,13 @@
--- Cache
cache entites (2 methods) marked @Cacheable
cache entites in txn context
cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem<E> that has an .evict() method
fix txn .andThen() chains
primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...)
create table wal {

View file

@ -22,6 +22,10 @@ import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.PrintStream;
import java.util.concurrent.Executor;
import net.helenus.core.operation.AbstractCache;
import net.helenus.core.operation.CacheManager;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException;
@ -111,7 +115,7 @@ public abstract class AbstractSessionOperations {
return null;
}
public void cache(String key, Object value) {}
public AbstractCache cacheFor(CacheManager.Type type) { return null; }
RuntimeException translateException(RuntimeException e) {
if (e instanceof HelenusException) {

View file

@ -60,7 +60,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final RowColumnValueProvider valueProvider;
private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata;
private final Cache<String, Object> sessionCache;
private final CacheManager cacheManager;
private UnitOfWork currentUnitOfWork;
HelenusSession(
@ -92,13 +92,8 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata();
this.sessionCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS)
.recordStats()
.build();
this.currentUnitOfWork = null;
this.cacheManager = new CacheManager(this);
}
@Override
@ -200,16 +195,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C
}
}
public void cache(String key, Object value) {
sessionCache.put(key, value); // ttl
}
public <E> SelectOperation<E> select(Class<E> entityClass) {
Objects.requireNonNull(entityClass, "entityClass is empty");
ColumnValueProvider valueProvider = getValueProvider();
HelenusEntity entity = Helenus.entity(entityClass);
//TODO cache entity
return new SelectOperation<E>(
this,
entity,
@ -453,6 +445,11 @@ public final class HelenusSession extends AbstractSessionOperations implements C
return new DeleteOperation(this, Helenus.resolve(dsl));
}
@Override
public AbstractCache cacheFor(CacheManager.Type type) {
return cacheManager.of(type);
}
public Session getSession() {
return session;
}

View file

@ -0,0 +1,38 @@
package net.helenus.core.operation;
import java.util.concurrent.ExecutionException;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
public abstract class AbstractCache {
protected CacheManager.Type type;
protected Cache<String, ResultSet> cache;
public AbstractCache(CacheManager.Type type, Cache<String, ResultSet> cache) {
this.type = type;
this.cache = cache;
}
protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
switch (type) {
case FETCH:
resultSet = fetch(statement, delegate, futureResultSet);
break;
case MUTATE:
resultSet = mutate(statement, delegate, futureResultSet);
break;
}
return resultSet;
}
}

View file

@ -24,7 +24,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public abstract E transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
protected AbstractCache getCache() { return null; }
public boolean cacheable() {
return false;
@ -42,11 +42,11 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public E sync() {
return Executioner.INSTANCE.<E>sync(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<E> async() {
return Executioner.INSTANCE.<E>async(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -33,7 +33,7 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public abstract Optional<E> transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
protected AbstractCache getCache() { return null; }
public CacheKey getCacheKey() { return null; }
@ -56,11 +56,11 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public Optional<E> sync() {
return Executioner.INSTANCE.<Optional<E>>sync(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Optional<E>> async() {
return Executioner.INSTANCE.<Optional<E>>async(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -33,7 +33,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public abstract Stream<E> transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
protected AbstractCache getCache() { return null; }
public CacheKey getCacheKey() { return null; }
@ -55,11 +55,11 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public Stream<E> sync() {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async() {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -38,6 +38,9 @@ public final class BoundOptionalOperation<E>
return delegate.transform(resultSet);
}
@Override
protected AbstractCache getCache() { return delegate.getCache(); }
@Override
public CacheKey getCacheKey() { return delegate.getCacheKey(); }

View file

@ -36,7 +36,7 @@ public final class BoundStreamOperation<E>
}
@Override
protected CacheManager getCacheManager() { return delegate.getCacheManager(); }
protected AbstractCache getCache() { return delegate.getCache(); }
@Override
public Stream<E> transform(ResultSet resultSet) {

View file

@ -1,9 +1,15 @@
package net.helenus.core.operation;
import com.datastax.driver.core.Statement;
public class CacheKey {
private String key;
static String of(Statement statement) {
return "use " + statement.getKeyspace() + "; " + statement.toString();
}
CacheKey() {}
CacheKey(String key) { this.key = key; }

View file

@ -3,47 +3,52 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import net.helenus.core.HelenusSession;
import net.helenus.mapping.HelenusEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public abstract class CacheManager {
public class CacheManager {
public enum Type { FETCH, MUTATE }
private static CacheManager sessionFetch = new SessionCacheManager(Type.FETCH);
final Logger logger = LoggerFactory.getLogger(getClass());
final HelenusSession session;
protected CacheManager.Type type;
private AbstractCache sessionFetch;
public CacheManager(HelenusSession session) {
this.session = session;
public static CacheManager of(Type type, HelenusEntity entity) {
if (entity != null && entity.isCacheable()) {
return sessionFetch;
}
return null;
RemovalListener<String, ResultSet> listener = new RemovalListener<String, ResultSet>() {
@Override
public void onRemoval(RemovalNotification<String, ResultSet> n){
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
Cache<String, ResultSet> cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
sessionFetch = new SessionCache(Type.FETCH, this, cache);
}
public CacheManager(Type type) {
this.type = type;
}
protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
switch (type) {
case FETCH:
resultSet = fetch(statement, delegate, futureResultSet);
break;
case MUTATE:
resultSet = mutate(statement, delegate, futureResultSet);
break;
}
return resultSet;
public AbstractCache of(CacheManager.Type type) {
return sessionFetch;
}
}

View file

@ -27,15 +27,12 @@ public enum Executioner {
<E> E sync(
AbstractSessionOperations session,
Statement statement,
CacheManager cacheManager,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
try {
return this.<E>async(session, statement, cacheManager, traceContext, delegate, showValues).get();
} catch (InterruptedException | ExecutionException e) {
throw new HelenusException(e);
}
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return this.<E>execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
@ -48,36 +45,55 @@ public enum Executioner {
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
CacheManager cacheManager,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
AbstractSessionOperations session,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
return CompletableFuture.<E>supplyAsync(
() -> {
Tracer tracer = session.getZipkinTracer();
final Span span =
(tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null;
try {
if (span != null) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.<E>supplyAsync(() ->
execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues));
}
public <E> E execute(ResultSetFuture futureResultSet,
AbstractSessionOperations session,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
ResultSet resultSet = cacheManager != null ? cacheManager.apply(statement, delegate, futureResultSet) :
futureResultSet.get();
E result = delegate.transform(resultSet);
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
});
ResultSet resultSet;
if (cache != null) {
resultSet = cache.apply(statement, delegate, futureResultSet);
} else {
resultSet = futureResultSet.get();
}
E result = delegate.transform(resultSet);
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
}
}

View file

@ -46,7 +46,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
protected Integer limit = null;
protected boolean allowFiltering = false;
protected CacheManager cacheManager;
public SelectOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -70,7 +69,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
};
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, null) ;
}
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity) {
@ -83,7 +81,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ;
}
public SelectOperation(
@ -100,7 +97,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ;
}
public SelectOperation(
@ -112,7 +108,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.rowMapper = rowMapper;
Collections.addAll(this.props, props);
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, null) ;
}
public CountOperation count() {
@ -262,10 +257,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
}
protected CacheManager getCacheManager() {
return cacheManager;
}
private List<Ordering> getOrCreateOrdering() {
if (ordering == null) {
ordering = new ArrayList<Ordering>();

View file

@ -40,6 +40,9 @@ public final class SelectTransformingOperation<R, E>
return src.buildStatement();
}
@Override
protected AbstractCache getCache() { return src.getCache(); }
@Override
public Stream<R> transform(ResultSet resultSet) {
return src.transform(resultSet).map(fn);

View file

@ -1,48 +1,27 @@
package net.helenus.core.operation;
import java.util.concurrent.ExecutionException;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.cache.Cache;
public class SessionCacheManager extends CacheManager {
final Logger logger = LoggerFactory.getLogger(getClass());
public class SessionCache extends AbstractCache {
private Cache<String, ResultSet> cache;
private final CacheManager manager;
SessionCacheManager(CacheManager.Type type) {
super(type);
RemovalListener<String, ResultSet> listener;
listener = new RemovalListener<String, ResultSet>() {
@Override
public void onRemoval(RemovalNotification<String, ResultSet> n){
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
SessionCache(CacheManager.Type type, CacheManager manager, Cache<String, ResultSet> cache) {
super(type, cache);
this.manager = manager;
}
protected ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
CacheKey key = delegate.getCacheKey();
final String cacheKey = key == null ? statement.toString() : key.toString();
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
resultSet = resultSetFuture.get();
@ -51,6 +30,7 @@ public class SessionCacheManager extends CacheManager {
if (resultSet == null) {
resultSet = resultSetFuture.get();
if (resultSet != null) {
planEvictionFor(statement);
cache.put(cacheKey, resultSet);
}
}
@ -64,9 +44,22 @@ public class SessionCacheManager extends CacheManager {
final String cacheKey = key == null ? statement.toString() : key.toString();
ResultSet resultSet = resultSetFuture.get();
if (cacheKey != null && resultSet != null) {
planEvictionFor(statement);
//manager.evictIfNecessary(statement, delegate);
cache.put(cacheKey, resultSet);
}
return resultSet;
}
private void planEvictionFor(Statement statement) {
//((Select)statement).table + statement.where.clauses.length == 0
//TTL for rows read
}
public ResultSet get(Statement statement, OperationsDelegate delegate) {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
return cache.getIfPresent(cacheKey);
}
}