WIP: toward caching at session and unit of work.

This commit is contained in:
Greg Burd 2017-08-25 16:13:31 -04:00
parent 7ac9288eb8
commit c35d6d19d1
21 changed files with 436 additions and 315 deletions

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.23-SNAPSHOT</version>
<version>2.0.24-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -22,10 +22,8 @@ import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.PrintStream;
import java.util.concurrent.Executor;
import net.helenus.core.operation.AbstractCache;
import net.helenus.core.operation.CacheManager;
import net.helenus.mapping.HelenusEntity;
import net.helenus.mapping.value.ColumnValuePreparer;
import net.helenus.mapping.value.ColumnValueProvider;
import net.helenus.support.HelenusException;
@ -115,7 +113,9 @@ public abstract class AbstractSessionOperations {
return null;
}
public AbstractCache cacheFor(CacheManager.Type type) { return null; }
public AbstractCache cacheFor(CacheManager.Type type) {
return null;
}
RuntimeException translateException(RuntimeException e) {
if (e instanceof HelenusException) {

View file

@ -42,6 +42,14 @@ import net.helenus.support.Fun.Tuple1;
import net.helenus.support.Fun.Tuple2;
import net.helenus.support.Fun.Tuple6;
import java.io.Closeable;
import java.io.PrintStream;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import static net.helenus.core.Query.eq;
public final class HelenusSession extends AbstractSessionOperations implements Closeable {
@ -65,7 +73,6 @@ public final class HelenusSession extends AbstractSessionOperations implements C
private final StatementColumnValuePreparer valuePreparer;
private final Metadata metadata;
private final CacheManager cacheManager;
private UnitOfWork currentUnitOfWork;
HelenusSession(
Session session,
@ -96,7 +103,6 @@ public final class HelenusSession extends AbstractSessionOperations implements C
this.valueProvider = new RowColumnValueProvider(this.sessionRepository);
this.valuePreparer = new StatementColumnValuePreparer(this.sessionRepository);
this.metadata = session.getCluster().getMetadata();
this.currentUnitOfWork = null;
this.cacheManager = new CacheManager(this);
}
@ -205,7 +211,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
ColumnValueProvider valueProvider = getValueProvider();
HelenusEntity entity = Helenus.entity(entityClass);
//TODO cache entity
//TODO cache entity
return new SelectOperation<E>(
this,
entity,
@ -498,7 +504,7 @@ public final class HelenusSession extends AbstractSessionOperations implements C
@Override
public AbstractCache cacheFor(CacheManager.Type type) {
return cacheManager.of(type);
return cacheManager.of(type);
}
public Session getSession() {

View file

@ -1,18 +1,35 @@
package net.helenus.core;
import com.diffplug.common.base.Errors;
import java.util.ArrayList;
import com.google.common.collect.TreeTraverser;
import java.util.*;
import java.util.function.Function;
/** Encapsulates the concept of a "transaction" as a unit-of-work. */
public class UnitOfWork {
private final HelenusSession session;
private ArrayList<UnitOfWork> nested;
static private final Map<HelenusSession, UnitOfWork> all = new HashMap<HelenusSession, UnitOfWork>();
static private final List<UnitOfWork> nested = new ArrayList<>();
UnitOfWork(HelenusSession session) {
this.session = session;
// log.record(txn::start)
private final HelenusSession session;
private ArrayList<Function> postCommit = new ArrayList<Function>();
private boolean aborted = false;
private boolean committed = false;
/**
* Marks the beginning of a transactional section of work. Will write a record to the shared
* write-ahead log.
*
* @return the handle used to commit or abort the work.
*/
static UnitOfWork begin(HelenusSession session) {
Objects.requireNonNull(session, "containing session cannot be null");
UnitOfWork uow = new UnitOfWork(session);
synchronized (all) {
all.put(session, uow);
}
return uow;
}
/**
@ -21,13 +38,37 @@ public class UnitOfWork {
*
* @return the handle used to commit or abort the work.
*/
public UnitOfWork begin() {
if (nested == null) {
nested = new ArrayList<UnitOfWork>();
static UnitOfWork begin(UnitOfWork parent) {
Objects.requireNonNull(parent, "parent unit of work cannot be null");
Objects.requireNonNull(all.get(parent), "parent unit of work is not currently active");
UnitOfWork uow = new UnitOfWork(parent.session);
synchronized (all) {
all.put(parent.session, uow);
parent.addNestedUnitOfWork(uow);
}
UnitOfWork unitOfWork = new UnitOfWork(session);
nested.add(unitOfWork);
return unitOfWork;
return uow;
}
private UnitOfWork(HelenusSession session) {
this.session = session;
// log.record(txn::start)
}
private void addNestedUnitOfWork(UnitOfWork uow) {
synchronized (nested) {
nested.add(uow);
}
}
private void applyPostCommitFunctions() {
for(Function f : postCommit) {
f.apply(null);
}
}
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
}
/**
@ -36,24 +77,58 @@ public class UnitOfWork {
* @return a function from which to chain work that only happens when commit is successful
* @throws ConflictingUnitOfWorkException when the work overlaps with other concurrent writers.
*/
public Function<Void, Void> commit() throws ConflictingUnitOfWorkException {
if (nested != null) {
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
}
public PostCommitFunction<Void, Void> commit() throws ConflictingUnitOfWorkException {
// All nested UnitOfWork should be committed (not aborted) before calls to commit, check.
boolean canCommit = true;
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
for (UnitOfWork uow : traverser.postOrderTraversal(this)) { canCommit &= (!uow.aborted && uow.committed); }
traverser.postOrderTraversal(this).forEach(uow -> { uow.applyPostCommitFunctions(); });
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// log.record(txn::provisionalCommit)
// examine log for conflicts in read-set and write-set between begin and provisional commit
// if (conflict) { throw new ConflictingUnitOfWorkException(this) }
// else return function so as to enable commit.andThen(() -> { do something iff commit was successful; })
return Function.<Void>identity();
return new PostCommitFunction<Void, Void>(this);
}
public void rollback() {
abort();
}
/** Explicitly discard the work and mark it as as such in the log. */
public void abort() {
// log.record(txn::abort)
// cache.invalidateSince(txn::start time)
TreeTraverser<UnitOfWork> traverser = TreeTraverser.using(node -> node::getChildNodes);
traverser.postOrderTraversal(this).forEach(uow -> { uow.aborted = true; });
}
public String describeConflicts() {
return "it's complex...";
}
private class PostCommitFunction<T, R> implements java.util.function.Function<T, R> {
private final UnitOfWork uow;
PostCommitFunction(UnitOfWork uow) {
this.uow = uow;
}
@Override
public <V> PostCommitFunction<T, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
postCommit.add(after);
return null;
}
@Override
public R apply(T t) {
return null;
}
}
}

View file

@ -1,38 +1,40 @@
package net.helenus.core.operation;
import java.util.concurrent.ExecutionException;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import java.util.concurrent.ExecutionException;
public abstract class AbstractCache {
protected CacheManager.Type type;
protected Cache<String, ResultSet> cache;
protected CacheManager.Type type;
protected Cache<String, ResultSet> cache;
public AbstractCache(CacheManager.Type type, Cache<String, ResultSet> cache) {
this.type = type;
this.cache = cache;
public AbstractCache(CacheManager.Type type, Cache<String, ResultSet> cache) {
this.type = type;
this.cache = cache;
}
protected abstract ResultSet fetch(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
protected abstract ResultSet mutate(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
public ResultSet apply(
Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
switch (type) {
case FETCH:
resultSet = fetch(statement, delegate, futureResultSet);
break;
case MUTATE:
resultSet = mutate(statement, delegate, futureResultSet);
break;
}
protected abstract ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
protected abstract ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
public ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture futureResultSet)
throws InterruptedException, ExecutionException {
ResultSet resultSet = null;
switch (type) {
case FETCH:
resultSet = fetch(statement, delegate, futureResultSet);
break;
case MUTATE:
resultSet = mutate(statement, delegate, futureResultSet);
break;
}
return resultSet;
}
return resultSet;
}
}

View file

@ -18,19 +18,24 @@ package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<E> {
public abstract E transform(ResultSet resultSet);
protected AbstractCache getCache() { return null; }
protected AbstractCache getCache() {
return null;
}
public boolean cacheable() {
return false;
}
public CacheKey getCacheKey() { return null; }
public CacheKey getCacheKey() {
return null;
}
public AbstractOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
@ -42,11 +47,20 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public E sync() {
return Executioner.INSTANCE.<E>sync(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public E sync(UnitOfWork uow) {
return Executioner.INSTANCE.<E>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<E> async() {
return Executioner.INSTANCE.<E>async(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<E> async(UnitOfWork uow) {
return Executioner.INSTANCE.<E>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Optional<E>> {
@ -33,9 +34,13 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
public abstract Optional<E> transform(ResultSet resultSet);
protected AbstractCache getCache() { return null; }
protected AbstractCache getCache() {
return null;
}
public CacheKey getCacheKey() { return null; }
public CacheKey getCacheKey() {
return null;
}
public PreparedOptionalOperation<E> prepare() {
return new PreparedOptionalOperation<E>(prepareStatement(), this);
@ -54,13 +59,23 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
public Optional<E> sync() {
return Executioner.INSTANCE.<Optional<E>>sync(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public Optional<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Optional<E>> async() {
return Executioner.INSTANCE.<Optional<E>>async(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Stream<E>> {
@ -33,9 +34,13 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public abstract Stream<E> transform(ResultSet resultSet);
protected AbstractCache getCache() { return null; }
protected AbstractCache getCache() {
return null;
}
public CacheKey getCacheKey() { return null; }
public CacheKey getCacheKey() {
return null;
}
public PreparedStreamOperation<E> prepare() {
return new PreparedStreamOperation<E>(prepareStatement(), this);
@ -55,11 +60,21 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public Stream<E> sync() {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public Stream<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async() {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, options(buildStatement()), getCache(), traceContext, this, showValues);
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
}
}

View file

@ -39,10 +39,14 @@ public final class BoundOptionalOperation<E>
}
@Override
protected AbstractCache getCache() { return delegate.getCache(); }
protected AbstractCache getCache() {
return delegate.getCache();
}
@Override
public CacheKey getCacheKey() { return delegate.getCacheKey(); }
public CacheKey getCacheKey() {
return delegate.getCacheKey();
}
@Override
public Statement buildStatement() {

View file

@ -36,7 +36,9 @@ public final class BoundStreamOperation<E>
}
@Override
protected AbstractCache getCache() { return delegate.getCache(); }
protected AbstractCache getCache() {
return delegate.getCache();
}
@Override
public Stream<E> transform(ResultSet resultSet) {
@ -44,7 +46,9 @@ public final class BoundStreamOperation<E>
}
@Override
public CacheKey getCacheKey() { return cacheKey; }
public CacheKey getCacheKey() {
return cacheKey;
}
@Override
public Statement buildStatement() {

View file

@ -4,33 +4,38 @@ import com.datastax.driver.core.Statement;
public class CacheKey {
private String key;
private String key;
static String of(Statement statement) {
return "use " + statement.getKeyspace() + "; " + statement.toString();
}
static String of(Statement statement) {
return "use " + statement.getKeyspace() + "; " + statement.toString();
}
CacheKey() {}
CacheKey() {}
CacheKey(String key) { this.key = key; }
CacheKey(String key) {
this.key = key;
}
public void set(String key) { this.key = key; }
public void set(String key) {
this.key = key;
}
public String toString() { return key; }
public String toString() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
CacheKey cacheKey = (CacheKey) o;
return key.equals(cacheKey.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
return key.equals(cacheKey.key);
}
@Override
public int hashCode() {
return key.hashCode();
}
}

View file

@ -1,54 +1,53 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.util.concurrent.TimeUnit;
import net.helenus.core.HelenusSession;
import net.helenus.mapping.HelenusEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CacheManager {
public enum Type { FETCH, MUTATE }
public enum Type {
FETCH,
MUTATE
}
final Logger logger = LoggerFactory.getLogger(getClass());
final HelenusSession session;
final Logger logger = LoggerFactory.getLogger(getClass());
final HelenusSession session;
private AbstractCache sessionFetch;
private AbstractCache sessionFetch;
public CacheManager(HelenusSession session) {
this.session = session;
public CacheManager(HelenusSession session) {
this.session = session;
RemovalListener<String, ResultSet> listener = new RemovalListener<String, ResultSet>() {
@Override
public void onRemoval(RemovalNotification<String, ResultSet> n){
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
RemovalListener<String, ResultSet> listener =
new RemovalListener<String, ResultSet>() {
@Override
public void onRemoval(RemovalNotification<String, ResultSet> n) {
if (n.wasEvicted()) {
String cause = n.getCause().name();
logger.info(cause);
}
}
};
Cache<String, ResultSet> cache = CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
Cache<String, ResultSet> cache =
CacheBuilder.newBuilder()
.maximumSize(10_000)
.expireAfterAccess(20, TimeUnit.MINUTES)
.weakKeys()
.softValues()
.removalListener(listener)
.build();
sessionFetch = new SessionCache(Type.FETCH, this, cache);
}
public AbstractCache of(CacheManager.Type type) {
return sessionFetch;
}
sessionFetch = new SessionCache(Type.FETCH, this, cache);
}
public AbstractCache of(CacheManager.Type type) {
return sessionFetch;
}
}

View file

@ -6,94 +6,81 @@ import brave.propagation.TraceContext;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.support.HelenusException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public enum Executioner {
INSTANCE;
<E> E sync(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
return sync(session, statement, null, traceContext, delegate, showValues);
<E> E sync(
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return this.<E>execute(
futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.<E>supplyAsync(
() ->
execute(
futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues));
}
public <E> E execute(
ResultSetFuture futureResultSet,
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
<E> E sync(
AbstractSessionOperations session,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return this.<E>execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
return async(session, statement, null, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.<E>supplyAsync(() ->
execute(futureResultSet, session, statement, cache, traceContext, delegate, showValues));
}
public <E> E execute(ResultSetFuture futureResultSet,
AbstractSessionOperations session,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
try {
if (span != null) {
span.name("cassandra");
span.start();
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
ResultSet resultSet;
if (cache != null) {
resultSet = cache.apply(statement, delegate, futureResultSet);
} else {
resultSet = futureResultSet.get();
}
E result = delegate.transform(resultSet);
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
ResultSet resultSet;
if (cache != null) {
resultSet = cache.apply(statement, delegate, futureResultSet);
} else {
resultSet = futureResultSet.get();
}
}
E result = delegate.transform(resultSet);
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (span != null) {
span.finish();
}
}
}
}

View file

@ -4,5 +4,6 @@ import com.datastax.driver.core.ResultSet;
public interface OperationsDelegate<E> {
E transform(ResultSet resultSet);
CacheKey getCacheKey();
}

View file

@ -17,7 +17,6 @@ package net.helenus.core.operation;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import java.util.regex.Pattern;
public final class PreparedStreamOperation<E> {
@ -39,8 +38,11 @@ public final class PreparedStreamOperation<E> {
BoundStatement boundStatement = preparedStatement.bind(params);
String key = "use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
for (Object param : params) { key = key.replaceFirst(Pattern.quote("?"), param.toString()); }
String key =
"use " + preparedStatement.getQueryKeyspace() + "; " + preparedStatement.getQueryString();
for (Object param : params) {
key = key.replaceFirst(Pattern.quote("?"), param.toString());
}
return new BoundStreamOperation<E>(boundStatement, new CacheKey(key), operation);
}

View file

@ -46,5 +46,4 @@ public final class SelectFirstOperation<E>
public Optional<E> transform(ResultSet resultSet) {
return src.transform(resultSet).findFirst();
}
}

View file

@ -46,29 +46,27 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
protected Integer limit = null;
protected boolean allowFiltering = false;
public SelectOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
this.rowMapper =
new Function<Row, E>() {
new Function<Row, E>() {
@Override
public E apply(Row source) {
@Override
public E apply(Row source) {
ColumnValueProvider valueProvider = sessionOps.getValueProvider();
Object[] arr = new Object[props.size()];
ColumnValueProvider valueProvider = sessionOps.getValueProvider();
Object[] arr = new Object[props.size()];
int i = 0;
for (HelenusPropertyNode p : props) {
Object value = valueProvider.getColumnValue(source, -1, p.getProperty());
arr[i++] = value;
}
return (E) Fun.ArrayTuple.of(arr);
}
};
int i = 0;
for (HelenusPropertyNode p : props) {
Object value = valueProvider.getColumnValue(source, -1, p.getProperty());
arr[i++] = value;
}
return (E) Fun.ArrayTuple.of(arr);
}
};
}
public SelectOperation(AbstractSessionOperations sessionOperations, HelenusEntity entity) {
@ -76,38 +74,35 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
super(sessionOperations);
entity
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
}
public SelectOperation(
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
AbstractSessionOperations sessionOperations,
HelenusEntity entity,
Function<Row, E> rowMapper) {
super(sessionOperations);
this.rowMapper = rowMapper;
entity
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
.getOrderedProperties()
.stream()
.map(p -> new HelenusPropertyNode(p, Optional.empty()))
.forEach(p -> this.props.add(p));
}
public SelectOperation(
AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
AbstractSessionOperations sessionOperations,
Function<Row, E> rowMapper,
HelenusPropertyNode... props) {
super(sessionOperations);
this.rowMapper = rowMapper;
Collections.addAll(this.props, props);
}
public CountOperation count() {
@ -119,10 +114,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity = prop.getEntity();
} else if (entity != prop.getEntity()) {
throw new HelenusMappingException(
"you can count records only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
"you can count records only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
}
}
@ -143,11 +138,11 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
this.rowMapper = null;
return new SelectTransformingOperation<R, E>(
this,
(r) -> {
Map<String, Object> map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity);
return (R) Helenus.map(entityClass, map);
});
this,
(r) -> {
Map<String, Object> map = new ValueProviderMap(r, sessionOps.getValueProvider(), entity);
return (R) Helenus.map(entityClass, map);
});
}
public <R> SelectTransformingOperation<R, E> map(Function<E, R> fn) {
@ -197,10 +192,10 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity = prop.getEntity();
} else if (entity != prop.getEntity()) {
throw new HelenusMappingException(
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
}
}
@ -229,7 +224,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
if (ifFilters != null && !ifFilters.isEmpty()) {
logger.error(
"onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
"onlyIf conditions " + ifFilters + " would be ignored in the statement " + select);
}
if (allowFiltering) {
@ -247,13 +242,13 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED), false)
.map(rowMapper);
.map(rowMapper);
} else {
return (Stream<E>)
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),
false);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(resultSet.iterator(), Spliterator.ORDERED),
false);
}
}

View file

@ -41,11 +41,12 @@ public final class SelectTransformingOperation<R, E>
}
@Override
protected AbstractCache getCache() { return src.getCache(); }
protected AbstractCache getCache() {
return src.getCache();
}
@Override
public Stream<R> transform(ResultSet resultSet) {
return src.transform(resultSet).map(fn);
}
}

View file

@ -1,65 +1,63 @@
package net.helenus.core.operation;
import java.util.concurrent.ExecutionException;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.cache.Cache;
import java.util.concurrent.ExecutionException;
public class SessionCache extends AbstractCache {
private final CacheManager manager;
private final CacheManager manager;
SessionCache(CacheManager.Type type, CacheManager manager, Cache<String, ResultSet> cache) {
super(type, cache);
this.manager = manager;
}
SessionCache(CacheManager.Type type, CacheManager manager, Cache<String, ResultSet> cache) {
super(type, cache);
this.manager = manager;
}
protected ResultSet fetch(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
resultSet = resultSetFuture.get();
} else {
resultSet = cache.getIfPresent(cacheKey);
if (resultSet == null) {
resultSet = resultSetFuture.get();
if (resultSet != null) {
planEvictionFor(statement);
cache.put(cacheKey, resultSet);
}
}
protected ResultSet fetch(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
resultSet = resultSetFuture.get();
} else {
resultSet = cache.getIfPresent(cacheKey);
if (resultSet == null) {
resultSet = resultSetFuture.get();
if (resultSet != null) {
planEvictionFor(statement);
cache.put(cacheKey, resultSet);
}
return resultSet;
}
}
return resultSet;
}
protected ResultSet mutate(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
CacheKey key = delegate.getCacheKey();
final String cacheKey = key == null ? statement.toString() : key.toString();
ResultSet resultSet = resultSetFuture.get();
if (cacheKey != null && resultSet != null) {
planEvictionFor(statement);
//manager.evictIfNecessary(statement, delegate);
cache.put(cacheKey, resultSet);
}
return resultSet;
protected ResultSet mutate(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
CacheKey key = delegate.getCacheKey();
final String cacheKey = key == null ? statement.toString() : key.toString();
ResultSet resultSet = resultSetFuture.get();
if (cacheKey != null && resultSet != null) {
planEvictionFor(statement);
//manager.evictIfNecessary(statement, delegate);
cache.put(cacheKey, resultSet);
}
return resultSet;
}
private void planEvictionFor(Statement statement) {
//((Select)statement).table + statement.where.clauses.length == 0
//TTL for rows read
}
public ResultSet get(Statement statement, OperationsDelegate delegate) {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
return cache.getIfPresent(cacheKey);
}
private void planEvictionFor(Statement statement) {
//((Select)statement).table + statement.where.clauses.length == 0
//TTL for rows read
}
public ResultSet get(Statement statement, OperationsDelegate delegate) {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
return cache.getIfPresent(cacheKey);
}
}

View file

@ -16,7 +16,6 @@
package net.helenus.test.integration.core.prepared;
import java.math.BigDecimal;
import net.helenus.core.annotation.Cacheable;
import net.helenus.mapping.annotation.PartitionKey;
import net.helenus.mapping.annotation.Table;

View file

@ -92,13 +92,13 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
// select row and map to entity
User actual =
session
.selectAll(User.class)
.mapTo(User.class)
.where(user::id, eq(100L))
.sync()
.findFirst()
.get();
session
.selectAll(User.class)
.mapTo(User.class)
.where(user::id, eq(100L))
.sync()
.findFirst()
.get();
assertUsers(newUser, actual);
// select as object
@ -157,7 +157,7 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
name =
(String)
session
.<Fun.ArrayTuple>select()
.select()
.column(user::name)
.where(user::id, eq(100L))
.sync()
@ -198,10 +198,10 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
}).sync();
session
.update(user::name, "albert")
.set(user::age, 35)
.where(user::id, Operator.EQ, 100L)
.sync();
.update(user::name, "albert")
.set(user::age, 35)
.where(user::id, Operator.EQ, 100L)
.sync();
long cnt = session.count(user).where(user::id, Operator.EQ, 100L).sync();
Assert.assertEquals(1L, cnt);