diff --git a/NOTES b/NOTES index cd9a7f6..c8ef22e 100644 --- a/NOTES +++ b/NOTES @@ -1,3 +1,13 @@ +--- Cache + +cache entites (2 methods) marked @Cacheable +cache entites in txn context +cache results when .cache() chained before .{a}sync() call, return a EvictableCacheItem that has an .evict() method +fix txn .andThen() chains + + + + primitive types have default values, (e.g. boolean, int, ...) but primative wrapper classes do not and can be null (e.g. Boolean, Integer, ...) create table wal { diff --git a/src/main/java/net/helenus/core/AbstractSessionOperations.java b/src/main/java/net/helenus/core/AbstractSessionOperations.java index b7c8344..0d8f315 100644 --- a/src/main/java/net/helenus/core/AbstractSessionOperations.java +++ b/src/main/java/net/helenus/core/AbstractSessionOperations.java @@ -22,6 +22,10 @@ import com.datastax.driver.core.querybuilder.BuiltStatement; import com.google.common.util.concurrent.ListenableFuture; import java.io.PrintStream; import java.util.concurrent.Executor; + +import net.helenus.core.operation.AbstractCache; +import net.helenus.core.operation.CacheManager; +import net.helenus.mapping.HelenusEntity; import net.helenus.mapping.value.ColumnValuePreparer; import net.helenus.mapping.value.ColumnValueProvider; import net.helenus.support.HelenusException; @@ -111,7 +115,7 @@ public abstract class AbstractSessionOperations { return null; } - public void cache(String key, Object value) {} + public AbstractCache cacheFor(CacheManager.Type type) { return null; } RuntimeException translateException(RuntimeException e) { if (e instanceof HelenusException) { diff --git a/src/main/java/net/helenus/core/HelenusSession.java b/src/main/java/net/helenus/core/HelenusSession.java index 82a589d..a5e70cf 100644 --- a/src/main/java/net/helenus/core/HelenusSession.java +++ b/src/main/java/net/helenus/core/HelenusSession.java @@ -60,7 +60,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C private final RowColumnValueProvider valueProvider; private final StatementColumnValuePreparer valuePreparer; private final Metadata metadata; - private final Cache sessionCache; + private final CacheManager cacheManager; private UnitOfWork currentUnitOfWork; HelenusSession( @@ -92,13 +92,8 @@ public final class HelenusSession extends AbstractSessionOperations implements C this.valueProvider = new RowColumnValueProvider(this.sessionRepository); this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository); this.metadata = session.getCluster().getMetadata(); - this.sessionCache = - CacheBuilder.newBuilder() - .maximumSize(MAX_CACHE_SIZE) - .expireAfterAccess(MAX_CACHE_EXPIRE_SECONDS, TimeUnit.SECONDS) - .recordStats() - .build(); this.currentUnitOfWork = null; + this.cacheManager = new CacheManager(this); } @Override @@ -200,16 +195,13 @@ public final class HelenusSession extends AbstractSessionOperations implements C } } - public void cache(String key, Object value) { - sessionCache.put(key, value); // ttl - } - public SelectOperation select(Class entityClass) { Objects.requireNonNull(entityClass, "entityClass is empty"); ColumnValueProvider valueProvider = getValueProvider(); HelenusEntity entity = Helenus.entity(entityClass); + //TODO cache entity return new SelectOperation( this, entity, @@ -453,6 +445,11 @@ public final class HelenusSession extends AbstractSessionOperations implements C return new DeleteOperation(this, Helenus.resolve(dsl)); } + @Override + public AbstractCache cacheFor(CacheManager.Type type) { + return cacheManager.of(type); + } + public Session getSession() { return session; } diff --git a/src/main/java/net/helenus/core/operation/AbstractCache.java b/src/main/java/net/helenus/core/operation/AbstractCache.java new file mode 100644 index 0000000..17a4cef --- /dev/null +++ b/src/main/java/net/helenus/core/operation/AbstractCache.java @@ -0,0 +1,38 @@ +package net.helenus.core.operation; + +import java.util.concurrent.ExecutionException; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Statement; +import com.google.common.cache.Cache; + +public abstract class AbstractCache { + protected CacheManager.Type type; + protected Cache cache; + + public AbstractCache(CacheManager.Type type, Cache cache) { + this.type = type; + this.cache = cache; + } + + protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException; + protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + throws InterruptedException, ExecutionException; + + public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet) + throws InterruptedException, ExecutionException { + ResultSet resultSet = null; + switch (type) { + case FETCH: + resultSet = fetch(statement, delegate, futureResultSet); + break; + case MUTATE: + resultSet = mutate(statement, delegate, futureResultSet); + break; + } + return resultSet; + } + +} diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index b5a3801..42e1826 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -24,7 +24,7 @@ public abstract class AbstractOperation> public abstract E transform(ResultSet resultSet); - protected CacheManager getCacheManager() { return null; } + protected AbstractCache getCache() { return null; } public boolean cacheable() { return false; @@ -42,11 +42,11 @@ public abstract class AbstractOperation> public E sync() { return Executioner.INSTANCE.sync( - sessionOps, options(buildStatement()), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture async() { return Executioner.INSTANCE.async( - sessionOps, options(buildStatement()), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 1fab97e..5246c65 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -33,7 +33,7 @@ public abstract class AbstractOptionalOperation transform(ResultSet resultSet); - protected CacheManager getCacheManager() { return null; } + protected AbstractCache getCache() { return null; } public CacheKey getCacheKey() { return null; } @@ -56,11 +56,11 @@ public abstract class AbstractOptionalOperation sync() { return Executioner.INSTANCE.>sync( - sessionOps, options(buildStatement()), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture> async() { return Executioner.INSTANCE.>async( - sessionOps, options(buildStatement()), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 1065221..911d3bb 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -33,7 +33,7 @@ public abstract class AbstractStreamOperation transform(ResultSet resultSet); - protected CacheManager getCacheManager() { return null; } + protected AbstractCache getCache() { return null; } public CacheKey getCacheKey() { return null; } @@ -55,11 +55,11 @@ public abstract class AbstractStreamOperation sync() { return Executioner.INSTANCE.>sync( - sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } public CompletableFuture> async() { return Executioner.INSTANCE.>async( - sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues); + sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java b/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java index f52597e..071dfab 100644 --- a/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundOptionalOperation.java @@ -38,6 +38,9 @@ public final class BoundOptionalOperation return delegate.transform(resultSet); } + @Override + protected AbstractCache getCache() { return delegate.getCache(); } + @Override public CacheKey getCacheKey() { return delegate.getCacheKey(); } diff --git a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java index 65361b4..4616dbe 100644 --- a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java @@ -36,7 +36,7 @@ public final class BoundStreamOperation } @Override - protected CacheManager getCacheManager() { return delegate.getCacheManager(); } + protected AbstractCache getCache() { return delegate.getCache(); } @Override public Stream transform(ResultSet resultSet) { diff --git a/src/main/java/net/helenus/core/operation/CacheKey.java b/src/main/java/net/helenus/core/operation/CacheKey.java index d984666..2f08402 100644 --- a/src/main/java/net/helenus/core/operation/CacheKey.java +++ b/src/main/java/net/helenus/core/operation/CacheKey.java @@ -1,9 +1,15 @@ package net.helenus.core.operation; +import com.datastax.driver.core.Statement; + public class CacheKey { private String key; + static String of(Statement statement) { + return "use " + statement.getKeyspace() + "; " + statement.toString(); + } + CacheKey() {} CacheKey(String key) { this.key = key; } diff --git a/src/main/java/net/helenus/core/operation/CacheManager.java b/src/main/java/net/helenus/core/operation/CacheManager.java index eeffe94..b9e26b4 100644 --- a/src/main/java/net/helenus/core/operation/CacheManager.java +++ b/src/main/java/net/helenus/core/operation/CacheManager.java @@ -3,47 +3,52 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import net.helenus.core.HelenusSession; import net.helenus.mapping.HelenusEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; - -public abstract class CacheManager { +public class CacheManager { public enum Type { FETCH, MUTATE } - private static CacheManager sessionFetch = new SessionCacheManager(Type.FETCH); + final Logger logger = LoggerFactory.getLogger(getClass()); + final HelenusSession session; - protected CacheManager.Type type; + private AbstractCache sessionFetch; + public CacheManager(HelenusSession session) { + this.session = session; - public static CacheManager of(Type type, HelenusEntity entity) { - if (entity != null && entity.isCacheable()) { - return sessionFetch; - } - return null; + RemovalListener listener = new RemovalListener() { + @Override + public void onRemoval(RemovalNotification n){ + if (n.wasEvicted()) { + String cause = n.getCause().name(); + logger.info(cause); + } + } + }; + + Cache cache = CacheBuilder.newBuilder() + .maximumSize(10_000) + .expireAfterAccess(20, TimeUnit.MINUTES) + .weakKeys() + .softValues() + .removalListener(listener) + .build(); + + sessionFetch = new SessionCache(Type.FETCH, this, cache); } - public CacheManager(Type type) { - this.type = type; - } - - protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException; - protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException; - - public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet) - throws InterruptedException, ExecutionException { - ResultSet resultSet = null; - switch (type) { - case FETCH: - resultSet = fetch(statement, delegate, futureResultSet); - break; - case MUTATE: - resultSet = mutate(statement, delegate, futureResultSet); - break; - } - return resultSet; + public AbstractCache of(CacheManager.Type type) { + return sessionFetch; } } diff --git a/src/main/java/net/helenus/core/operation/Executioner.java b/src/main/java/net/helenus/core/operation/Executioner.java index 28f666e..361a0c5 100644 --- a/src/main/java/net/helenus/core/operation/Executioner.java +++ b/src/main/java/net/helenus/core/operation/Executioner.java @@ -27,15 +27,12 @@ public enum Executioner { E sync( AbstractSessionOperations session, Statement statement, - CacheManager cacheManager, + AbstractCache cache, TraceContext traceContext, OperationsDelegate delegate, boolean showValues) { - try { - return this.async(session, statement, cacheManager, traceContext, delegate, showValues).get(); - } catch (InterruptedException | ExecutionException e) { - throw new HelenusException(e); - } + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + return this.execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues); } public CompletableFuture async( @@ -48,36 +45,55 @@ public enum Executioner { } public CompletableFuture async( - AbstractSessionOperations session, - Statement statement, - CacheManager cacheManager, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + AbstractSessionOperations session, + Statement statement, + AbstractCache cache, + TraceContext traceContext, + OperationsDelegate delegate, + boolean showValues) { - return CompletableFuture.supplyAsync( - () -> { - Tracer tracer = session.getZipkinTracer(); - final Span span = - (tracer != null && traceContext != null) ? tracer.newChild(traceContext) : null; - try { - if (span != null) { + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + return CompletableFuture.supplyAsync(() -> + execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues)); + } + + public E execute(ResultSetFuture futureResultSet, + AbstractSessionOperations session, + Statement statement, + AbstractCache cache, + TraceContext traceContext, + OperationsDelegate delegate, + boolean showValues) { + + Tracer tracer = session.getZipkinTracer(); + Span span = null; + if (tracer != null && traceContext != null) { + span = tracer.newChild(traceContext); + } + + try { + if (span != null) { span.name("cassandra"); span.start(); - } - ResultSet resultSet = cacheManager != null ? cacheManager.apply(statement, delegate, futureResultSet) : - futureResultSet.get(); - E result = delegate.transform(resultSet); - - return result; - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } finally { - if (span != null) { - span.finish(); - } } - }); + + ResultSet resultSet; + if (cache != null) { + resultSet = cache.apply(statement, delegate, futureResultSet); + } else { + resultSet = futureResultSet.get(); + } + + E result = delegate.transform(resultSet); + + return result; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } finally { + if (span != null) { + span.finish(); + } + } } + } diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index db4ce44..ead9332 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -46,7 +46,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation new HelenusPropertyNode(p, Optional.empty())) .forEach(p -> this.props.add(p)); - this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ; } public SelectOperation( @@ -100,7 +97,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation new HelenusPropertyNode(p, Optional.empty())) .forEach(p -> this.props.add(p)); - this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ; } public SelectOperation( @@ -112,7 +108,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation getOrCreateOrdering() { if (ordering == null) { ordering = new ArrayList(); diff --git a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java index 970e8f7..00fff1e 100644 --- a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java @@ -40,6 +40,9 @@ public final class SelectTransformingOperation return src.buildStatement(); } + @Override + protected AbstractCache getCache() { return src.getCache(); } + @Override public Stream transform(ResultSet resultSet) { return src.transform(resultSet).map(fn); diff --git a/src/main/java/net/helenus/core/operation/SessionCacheManager.java b/src/main/java/net/helenus/core/operation/SessionCache.java similarity index 54% rename from src/main/java/net/helenus/core/operation/SessionCacheManager.java rename to src/main/java/net/helenus/core/operation/SessionCache.java index 5b4d989..8cbfda5 100644 --- a/src/main/java/net/helenus/core/operation/SessionCacheManager.java +++ b/src/main/java/net/helenus/core/operation/SessionCache.java @@ -1,48 +1,27 @@ package net.helenus.core.operation; +import java.util.concurrent.ExecutionException; + import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; -import com.google.common.cache.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import com.datastax.driver.core.querybuilder.Select; +import com.google.common.cache.Cache; -public class SessionCacheManager extends CacheManager { - final Logger logger = LoggerFactory.getLogger(getClass()); +public class SessionCache extends AbstractCache { - private Cache cache; + private final CacheManager manager; - SessionCacheManager(CacheManager.Type type) { - super(type); - - RemovalListener listener; - listener = new RemovalListener() { - @Override - public void onRemoval(RemovalNotification n){ - if (n.wasEvicted()) { - String cause = n.getCause().name(); - logger.info(cause); - } - } - }; - - cache = CacheBuilder.newBuilder() - .maximumSize(10_000) - .expireAfterAccess(20, TimeUnit.MINUTES) - .weakKeys() - .softValues() - .removalListener(listener) - .build(); + SessionCache(CacheManager.Type type, CacheManager manager, Cache cache) { + super(type, cache); + this.manager = manager; } protected ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) throws InterruptedException, ExecutionException { - CacheKey key = delegate.getCacheKey(); - final String cacheKey = key == null ? statement.toString() : key.toString(); + final CacheKey key = delegate.getCacheKey(); + final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); ResultSet resultSet = null; if (cacheKey == null) { resultSet = resultSetFuture.get(); @@ -51,6 +30,7 @@ public class SessionCacheManager extends CacheManager { if (resultSet == null) { resultSet = resultSetFuture.get(); if (resultSet != null) { + planEvictionFor(statement); cache.put(cacheKey, resultSet); } } @@ -64,9 +44,22 @@ public class SessionCacheManager extends CacheManager { final String cacheKey = key == null ? statement.toString() : key.toString(); ResultSet resultSet = resultSetFuture.get(); if (cacheKey != null && resultSet != null) { + planEvictionFor(statement); + //manager.evictIfNecessary(statement, delegate); cache.put(cacheKey, resultSet); } return resultSet; } + private void planEvictionFor(Statement statement) { + //((Select)statement).table + statement.where.clauses.length == 0 + //TTL for rows read + } + + public ResultSet get(Statement statement, OperationsDelegate delegate) { + final CacheKey key = delegate.getCacheKey(); + final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); + return cache.getIfPresent(cacheKey); + } + }