WIP: Working toward a solution that can cache result sets and/or transformed entity instances when annotated with @Cacheable in the Session cache.

This commit is contained in:
Greg Burd 2017-08-17 16:00:19 -04:00
parent 0fd9ff828c
commit 28aa3b1bae
17 changed files with 290 additions and 72 deletions

View file

@ -220,7 +220,6 @@ public final class SessionInitializer extends AbstractSessionOperations {
}
public void singleton() {
Helenus.setSession(get());
}

View file

@ -20,17 +20,17 @@ import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
extends AbstractStatementOperation<E, O> implements Transformational<E> {
extends AbstractStatementOperation<E, O> implements OperationsDelegate<E> {
public abstract E transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
public boolean cacheable() {
return false;
}
public String getCacheKey() {
return "";
}
public CacheKey getCacheKey() { return null; }
public AbstractOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);

View file

@ -25,7 +25,7 @@ import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> implements Transformational<Optional<E>> {
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Optional<E>> {
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -33,6 +33,10 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public abstract Optional<E> transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
public CacheKey getCacheKey() { return null; }
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
}

View file

@ -25,7 +25,7 @@ import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> implements Transformational<Stream<E>> {
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Stream<E>> {
public AbstractStreamOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -33,6 +33,10 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public abstract Stream<E> transform(ResultSet resultSet);
protected CacheManager getCacheManager() { return null; }
public CacheKey getCacheKey() { return null; }
public PreparedStreamOperation<E> prepare() {
return new PreparedStreamOperation<E>(prepareStatement(), this);
}
@ -51,11 +55,11 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public Stream<E> sync() {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async() {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, options(buildStatement()), traceContext, this, showValues);
sessionOps, options(buildStatement()), getCacheManager(), traceContext, this, showValues);
}
}

View file

@ -38,6 +38,9 @@ public final class BoundOptionalOperation<E>
return delegate.transform(resultSet);
}
@Override
public CacheKey getCacheKey() { return delegate.getCacheKey(); }
@Override
public Statement buildStatement() {
return boundStatement;

View file

@ -25,19 +25,27 @@ public final class BoundStreamOperation<E>
private final BoundStatement boundStatement;
private final AbstractStreamOperation<E, ?> delegate;
private final CacheKey cacheKey;
public BoundStreamOperation(
BoundStatement boundStatement, AbstractStreamOperation<E, ?> operation) {
BoundStatement boundStatement, CacheKey cacheKey, AbstractStreamOperation<E, ?> operation) {
super(operation.sessionOps);
this.boundStatement = boundStatement;
this.cacheKey = cacheKey;
this.delegate = operation;
}
@Override
protected CacheManager getCacheManager() { return delegate.getCacheManager(); }
@Override
public Stream<E> transform(ResultSet resultSet) {
return delegate.transform(resultSet);
}
@Override
public CacheKey getCacheKey() { return cacheKey; }
@Override
public Statement buildStatement() {
return boundStatement;

View file

@ -0,0 +1,30 @@
package net.helenus.core.operation;
public class CacheKey {
private String key;
CacheKey() {}
CacheKey(String key) { this.key = key; }
public void set(String key) { this.key = key; }
public String toString() { return key; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return key.equals(cacheKey.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
}

View file

@ -0,0 +1,49 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import net.helenus.mapping.HelenusEntity;
import java.util.concurrent.ExecutionException;
public abstract class CacheManager {
public enum Type { FETCH, MUTATE }
private static CacheManager sessionFetch = new SessionCacheManager(Type.FETCH);
protected CacheManager.Type type;
public static CacheManager of(Type type, HelenusEntity entity) {
if (entity != null && entity.isCacheable()) {
return sessionFetch;
}
return null;
}
public CacheManager(Type type) {
this.type = type;
}
protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
switch (type) {
case FETCH:
resultSet = fetch(statement, delegate, futureResultSet);
break;
case MUTATE:
resultSet = mutate(statement, delegate, futureResultSet);
break;
}
return resultSet;
}
}

View file

@ -7,6 +7,7 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.support.HelenusException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -14,28 +15,48 @@ import java.util.concurrent.ExecutionException;
public enum Executioner {
INSTANCE;
<E> E sync(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
Transformational<E> delegate,
boolean showValues) {
<E> E sync(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
return sync(session, statement, null, traceContext, delegate, showValues);
}
<E> E sync(
AbstractSessionOperations session,
Statement statement,
CacheManager cacheManager,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
try {
return this.<E>async(session, statement, traceContext, delegate, showValues).get();
return this.<E>async(session, statement, cacheManager, traceContext, delegate, showValues).get();
} catch (InterruptedException | ExecutionException e) {
return null;
throw new HelenusException(e);
}
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
Transformational<E> delegate,
boolean showValues) {
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
return async(session, statement, null, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
CacheManager cacheManager,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.supplyAsync(
return CompletableFuture.<E>supplyAsync(
() -> {
Tracer tracer = session.getZipkinTracer();
final Span span =
@ -45,7 +66,8 @@ public enum Executioner {
span.name("cassandra");
span.start();
}
ResultSet resultSet = futureResultSet.get(); // TODO: timeout
ResultSet resultSet = cacheManager != null ? cacheManager.apply(statement, delegate, futureResultSet) :
futureResultSet.get();
E result = delegate.transform(resultSet);
return result;

View file

@ -2,6 +2,7 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
public interface Transformational<E> {
public interface OperationsDelegate<E> {
E transform(ResultSet resultSet);
CacheKey getCacheKey();
}

View file

@ -18,6 +18,8 @@ package net.helenus.core.operation;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import java.util.regex.Pattern;
public final class PreparedStreamOperation<E> {
private final PreparedStatement preparedStatement;
@ -37,7 +39,10 @@ public final class PreparedStreamOperation<E> {
BoundStatement boundStatement = preparedStatement.bind(params);
return new BoundStreamOperation<E>(boundStatement, operation);
String key = "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
for (Object param : params) { key = key.replaceFirst(Pattern.quote("?"), param.toString()); }
return new BoundStreamOperation<E>(boundStatement, new CacheKey(key), operation);
}
@Override

View file

@ -46,4 +46,5 @@ public final class SelectFirstOperation<E>
public Optional<E> transform(ResultSet resultSet) {
return src.transform(resultSet).findFirst();
}
}

View file

@ -46,27 +46,31 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
protected Integer limit = null;
protected boolean allowFiltering = false;
protected CacheManager cacheManager;
public SelectOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.rowMapper =
new Function<Row, E>() {
new Function<Row, E>() {
@Override
public E apply(Row source) {
@Override
public E apply(Row source) {
ColumnValueProvider valueProvider = sessionOps.getValueProvider();
Object[] arr = new Object[props.size()];
ColumnValueProvider valueProvider = sessionOps.getValueProvider();
Object[] arr = new Object[props.size()];
int i = 0;
for (HelenusPropertyNode p : props) {
Object value = valueProvider.getColumnValue(source, -1, p.getProperty());
arr[i++] = value;
}
int i = 0;
for (HelenusPropertyNode p : props) {
Object value = valueProvider.getColumnValue(source, -1, p.getProperty());
arr[i++] = value;
}
return (E) Fun.ArrayTuple.of(arr);
}
};
return (E) Fun.ArrayTuple.of(arr);
}
};
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, null) ;
}
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity) {
@ -74,35 +78,41 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
super(sessionOperations);
entity
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ;
}
public SelectOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
super(sessionOperations);
this.rowMapper = rowMapper;
entity
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, entity) ;
}
public SelectOperation(
AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
super(sessionOperations);
this.rowMapper = rowMapper;
Collections.addAll(this.props, props);
this.cacheManager = CacheManager.of(CacheManager.Type.FETCH, null) ;
}
public CountOperation count() {
@ -114,10 +124,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity = prop.getEntity();
} else if (entity != prop.getEntity()) {
throw new HelenusMappingException(
"you can count records only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
"you can count records only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
}
}
@ -138,11 +148,11 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.rowMapper = null;
return new SelectTransformingOperation<R, E>(
this,
(r) -> {
Map<String, Object> map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity);
return (R) Helenus.map(entityClass, map);
});
this,
(r) -> {
Map<String, Object> map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity);
return (R) Helenus.map(entityClass, map);
});
}
public <R> SelectTransformingOperation<R, E> map(Function<E, R> fn) {
@ -192,10 +202,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity = prop.getEntity();
} else if (entity != prop.getEntity()) {
throw new HelenusMappingException(
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
}
}
@ -224,7 +234,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
if (ifFilters != null && !ifFilters.isEmpty()) {
logger.error(
"onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
"onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
}
if (allowFiltering) {
@ -242,16 +252,20 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), false)
.map(rowMapper);
.map(rowMapper);
} else {
return (Stream<E>)
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),
false);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),
false);
}
}
protected CacheManager getCacheManager() {
return cacheManager;
}
private List<Ordering> getOrCreateOrdering() {
if (ordering == null) {
ordering = new ArrayList<Ordering>();

View file

@ -44,4 +44,5 @@ public final class SelectTransformingOperation<R, E>
public Stream<R> transform(ResultSet resultSet) {
return src.transform(resultSet).map(fn);
}
}

View file

@ -0,0 +1,72 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class SessionCacheManager extends CacheManager {
final Logger logger = LoggerFactory.getLogger(getClass());
private Cache<String, ResultSet> cache;
SessionCacheManager(CacheManager.Type type) {
super(type);
RemovalListener<String, ResultSet> listener;
listener = new RemovalListener<String, ResultSet>() {
@Override
public void onRemoval(RemovalNotification<String, ResultSet> n){
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
}
protected ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
CacheKey key = delegate.getCacheKey();
final String cacheKey = key == null ? statement.toString() : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
resultSet = resultSetFuture.get();
} else {
resultSet = cache.getIfPresent(cacheKey);
if (resultSet == null) {
resultSet = resultSetFuture.get();
if (resultSet != null) {
cache.put(cacheKey, resultSet);
}
}
}
return resultSet;
}
protected ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
CacheKey key = delegate.getCacheKey();
final String cacheKey = key == null ? statement.toString() : key.toString();
ResultSet resultSet = resultSetFuture.get();
if (cacheKey != null && resultSet != null) {
cache.put(cacheKey, resultSet);
}
return resultSet;
}
}

View file

@ -16,10 +16,13 @@
package net.helenus.test.integration.core.prepared;
import java.math.BigDecimal;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
@Table("cars")
@Cacheable
public interface Car {
@PartitionKey(ordinal = 0)

View file

@ -15,11 +15,13 @@
*/
package net.helenus.test.integration.core.simple;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.Column;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;
@Table("simple_users")
@Cacheable
public interface User {
@PartitionKey