WIP: More work toward entity cache, very broken at this point.

This commit is contained in:
Greg Burd 2017-09-01 15:38:58 -04:00
parent cb09324ac6
commit 1b46ee0ed1
41 changed files with 332 additions and 203 deletions

View file

@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.helenus</groupId>
<artifactId>helenus-core</artifactId>
<version>2.0.26-SNAPSHOT</version>
<version>2.0.27-SNAPSHOT</version>
<packaging>jar</packaging>
<name>helenus</name>

View file

@ -1,8 +1,10 @@
package net.helenus.core;
import com.datastax.driver.core.ResultSet;
import com.diffplug.common.base.Errors;
import com.google.common.collect.TreeTraverser;
import net.helenus.core.operation.AbstractCache;
import net.helenus.core.operation.CacheKey;
import net.helenus.core.operation.UnitOfWorkCache;
import net.helenus.support.HelenusException;
@ -16,6 +18,7 @@ public final class UnitOfWork implements AutoCloseable {
private final HelenusSession session;
private final UnitOfWork parent;
private List<CommitThunk> postCommit = new ArrayList<CommitThunk>();
private final Map<CacheKey, ResultSet> cache = new HashMap<>();
private boolean aborted = false;
private boolean committed = false;
@ -56,6 +59,10 @@ public final class UnitOfWork implements AutoCloseable {
}
}
public UnitOfWork getEnclosingUnitOfWork() { return parent; }
public Map<CacheKey, ResultSet> getCache() { return cache; }
private Iterator<UnitOfWork> getChildNodes() {
return nested.iterator();
}
@ -85,6 +92,8 @@ public final class UnitOfWork implements AutoCloseable {
committed = true;
aborted = false;
// TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold
nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit));
// Apply all post-commit functions for

View file

@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
public abstract class AbstractCache<K, V> {
final Logger logger = LoggerFactory.getLogger(getClass());
protected Cache<K, V> cache;
public Cache<K, V> cache;
public AbstractCache() {
RemovalListener<K, V> listener =
@ -38,8 +38,11 @@ public abstract class AbstractCache<K, V> {
.build();
}
protected abstract ResultSet apply(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException;
V get(K key) {
return cache.getIfPresent(key);
}
void put(K key, V value) {
cache.put(key, value);
}
}

View file

@ -15,15 +15,19 @@
*/
package net.helenus.core.operation;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterOptionalOperation<
E, O extends AbstractFilterOptionalOperation<E, O>>
extends AbstractOptionalOperation<E, O> {
protected List<Filter<?>> filters = null;
protected Map<HelenusProperty, Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;
public AbstractFilterOptionalOperation(AbstractSessionOperations sessionOperations) {
@ -95,9 +99,9 @@ public abstract class AbstractFilterOptionalOperation<
private void addFilter(Filter<?> filter) {
if (filters == null) {
filters = new LinkedList<Filter<?>>();
filters = new LinkedHashMap<HelenusProperty, Filter<?>>();
}
filters.add(filter);
filters.put(filter.getNode().getProperty(), filter);
}
private void addIfFilter(Filter<?> filter) {

View file

@ -15,15 +15,19 @@
*/
package net.helenus.core.operation;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import net.helenus.core.*;
import net.helenus.mapping.HelenusProperty;
public abstract class AbstractFilterStreamOperation<
E, O extends AbstractFilterStreamOperation<E, O>>
extends AbstractStreamOperation<E, O> {
protected List<Filter<?>> filters = null;
protected Map<HelenusProperty, Filter<?>> filters = null;
protected List<Filter<?>> ifFilters = null;
public AbstractFilterStreamOperation(AbstractSessionOperations sessionOperations) {
@ -95,9 +99,9 @@ public abstract class AbstractFilterStreamOperation<
private void addFilter(Filter<?> filter) {
if (filters == null) {
filters = new LinkedList<Filter<?>>();
filters = new LinkedHashMap<HelenusProperty, Filter<?>>();
}
filters.add(filter);
filters.put(filter.getNode().getProperty(), filter);
}
private void addIfFilter(Filter<?> filter) {

View file

@ -25,7 +25,7 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
public abstract E transform(ResultSet resultSet);
protected AbstractCache getCache() {
public AbstractCache getCache() {
return null;
}
@ -46,21 +46,19 @@ public abstract class AbstractOperation<E, O extends AbstractOperation<E, O>>
}
public E sync() {
return Executioner.INSTANCE.<E>sync(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<E>sync(sessionOps, null, traceContext, this, showValues);
}
public E sync(UnitOfWork uow) {
return Executioner.INSTANCE.<E>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<E>sync(sessionOps, uow, traceContext, this, showValues);
}
public CompletableFuture<E> async() {
return Executioner.INSTANCE.<E>async(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
AbstractCache cache = getCache();
boolean cacheResult = cache != null;
return Executioner.INSTANCE.<E>async(sessionOps, null, traceContext, this, showValues);
}
public CompletableFuture<E> async(UnitOfWork uow) {
return Executioner.INSTANCE.<E>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<E>async(sessionOps, uow, traceContext, this, showValues);
}
}

View file

@ -28,13 +28,22 @@ import net.helenus.core.UnitOfWork;
public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOperation<E, O>>
extends AbstractStatementOperation<E, O> implements OperationsDelegate<Optional<E>> {
private Function<Optional<E>, E> extractor = new Function<Optional<E>, E>() {
@Override
public E apply(Optional<E> e) {
return e.orElse(null);
}
};
public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) {
super(sessionOperations);
}
public abstract Optional<E> transform(ResultSet resultSet);
protected AbstractCache getCache() { return null; }
public AbstractCache getCache() { return null; }
public CacheKey getCacheKey() {
return null;
@ -57,23 +66,19 @@ public abstract class AbstractOptionalOperation<E, O extends AbstractOptionalOpe
}
public Optional<E> sync() {
return Executioner.INSTANCE.<Optional<E>>sync(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Optional<E>>sync(sessionOps, null, extractor, traceContext, this, showValues);
}
public Optional<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Optional<E>>sync(sessionOps, uow, extractor, traceContext, this, showValues);
}
public CompletableFuture<Optional<E>> async() {
return Executioner.INSTANCE.<Optional<E>>async(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Optional<E>>async(sessionOps, null, extractor, traceContext, this, showValues);
}
public CompletableFuture<Optional<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Optional<E>>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Optional<E>>async(sessionOps, uow, extractor, traceContext, this, showValues);
}
}

View file

@ -38,7 +38,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
protected final AbstractSessionOperations sessionOps;
public abstract Statement buildStatement();
public abstract Statement buildStatement(boolean cached);
protected boolean showValues = true;
protected TraceContext traceContext;
@ -181,7 +181,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
return (O) this;
}
protected Statement options(Statement statement) {
public Statement options(Statement statement) {
if (defaultTimestamp != null) {
statement.setDefaultTimestamp(defaultTimestamp[0]);
@ -224,11 +224,11 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
}
public Statement statement() {
return buildStatement();
return buildStatement(false);
}
public String cql() {
Statement statement = buildStatement();
Statement statement = buildStatement(false);
if (statement == null) return "";
if (statement instanceof BuiltStatement) {
BuiltStatement buildStatement = (BuiltStatement) statement;
@ -240,7 +240,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public PreparedStatement prepareStatement() {
Statement statement = buildStatement();
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {
@ -254,7 +254,7 @@ public abstract class AbstractStatementOperation<E, O extends AbstractStatementO
public ListenableFuture<PreparedStatement> prepareStatementAsync() {
Statement statement = buildStatement();
Statement statement = buildStatement(true);
if (statement instanceof RegularStatement) {

View file

@ -34,7 +34,7 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
public abstract Stream<E> transform(ResultSet resultSet);
protected AbstractCache getCache() {
public AbstractCache getCache() {
return null;
}
@ -59,22 +59,18 @@ public abstract class AbstractStreamOperation<E, O extends AbstractStreamOperati
}
public Stream<E> sync() {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Stream<E>>sync(sessionOps, null, traceContext, this, showValues);
}
public Stream<E> sync(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>sync(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Stream<E>>sync(sessionOps, uow, traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async() {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Stream<E>>async(sessionOps, null, traceContext, this, showValues);
}
public CompletableFuture<Stream<E>> async(UnitOfWork uow) {
return Executioner.INSTANCE.<Stream<E>>async(
sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues);
return Executioner.INSTANCE.<Stream<E>>async(sessionOps, uow, traceContext, this, showValues);
}
}

View file

@ -36,7 +36,7 @@ public final class BoundOperation<E> extends AbstractOperation<E, BoundOperation
}
@Override
public Statement buildStatement() {
public Statement buildStatement(boolean cached) {
return boundStatement;
}
}

View file

@ -39,7 +39,7 @@ public final class BoundOptionalOperation<E>
}
@Override
protected AbstractCache getCache() {
public AbstractCache getCache() {
return delegate.getCache();
}
@ -49,7 +49,7 @@ public final class BoundOptionalOperation<E>
}
@Override
public Statement buildStatement() {
public Statement buildStatement(boolean cached) {
return boundStatement;
}
}

View file

@ -36,7 +36,7 @@ public final class BoundStreamOperation<E>
}
@Override
protected AbstractCache getCache() {
public AbstractCache getCache() {
return delegate.getCache();
}
@ -51,7 +51,7 @@ public final class BoundStreamOperation<E>
}
@Override
public Statement buildStatement() {
public Statement buildStatement(boolean cached) {
return boundStatement;
}
}

View file

@ -1,20 +1,19 @@
package net.helenus.core.operation;
import com.datastax.driver.core.Statement;
import net.helenus.mapping.HelenusEntity;
import java.io.Serializable;
public class CacheKey implements Serializable {
private String key;
static String of(Statement statement) {
return "use " + statement.getKeyspace() + "; " + statement.toString();
}
private HelenusEntity entity;
CacheKey() {}
CacheKey(String key) {
CacheKey(HelenusEntity entity, String key) {
this.entity = entity;
this.key = key;
}
@ -23,7 +22,7 @@ public class CacheKey implements Serializable {
}
public String toString() {
return key;
return entity.getName() + "." + key;
}
@Override

View file

@ -40,7 +40,7 @@ public final class CountOperation extends AbstractFilterOperation<Long, CountOpe
}
@Override
public BuiltStatement buildStatement() {
public BuiltStatement buildStatement(boolean cached) {
if (filters != null && !filters.isEmpty()) {
filters.forEach(f -> addPropertyNode(f.getNode()));

View file

@ -46,7 +46,7 @@ public final class DeleteOperation extends AbstractFilterOperation<ResultSet, De
}
@Override
public BuiltStatement buildStatement() {
public BuiltStatement buildStatement(boolean cached) {
if (filters != null && !filters.isEmpty()) {
filters.forEach(f -> addPropertyNode(f.getNode()));

View file

@ -6,79 +6,93 @@ import brave.propagation.TraceContext;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import net.helenus.core.AbstractSessionOperations;
import net.helenus.core.UnitOfWork;
public enum Executioner {
INSTANCE;
<E> E sync(
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return this.<E>execute(
futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues);
<E> E sync(AbstractSessionOperations session, UnitOfWork uow, Function<E, ?> extractor,
TraceContext traceContext, OperationsDelegate<E> delegate, boolean showValues) {
return this.<E>execute(session, uow, traceContext, delegate, showValues);
}
public <E> CompletableFuture<E> async(
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
return CompletableFuture.<E>supplyAsync(
() ->
execute(
futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues));
public <E> CompletableFuture<E> async(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext,
OperationsDelegate<E> delegate, boolean showValues) {
return CompletableFuture.<E>supplyAsync(() -> execute(session, uow, traceContext, delegate, showValues));
}
public <E> E execute(
ResultSetFuture futureResultSet,
AbstractSessionOperations session,
UnitOfWork uow,
Statement statement,
AbstractCache cache,
TraceContext traceContext,
OperationsDelegate<E> delegate,
boolean showValues) {
public <E> E execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext,
OperationsDelegate<E> delegate, boolean showValues) {
// Start recording in a Zipkin sub-span our execution time to perform this operation.
Tracer tracer = session.getZipkinTracer();
Span span = null;
if (tracer != null && traceContext != null) {
span = tracer.newChild(traceContext);
}
if (uow != null) {
cache = uow.getCacheEnclosing(cache);
}
try {
if (span != null) {
span.name("cassandra");
span.start();
}
ResultSet resultSet;
if (cache != null) {
resultSet = cache.apply(statement, delegate, futureResultSet);
// Determine if we are caching and if so where to put the results.
AbstractCache<CacheKey, Object> cache = null;
boolean prepareStatementForCaching = false;
if (uow != null ) {
prepareStatementForCaching = true;
cache = uow.getCacheEnclosing(delegate.getCache());
} else {
resultSet = futureResultSet.get();
cache = delegate.getCache();
prepareStatementForCaching = cache != null;
}
E result = delegate.transform(resultSet);
// TODO: first, ask the delegate for the cacheKey
// if this is a SELECT query:
// if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations
// if INSERT/UPSERT/UPDATE
// if DELETE
// if COUNT
CacheKey key = (cache == null) ? null : delegate.getCacheKey();
E result = null;
if (key != null && cache != null) {
// Right now we only support Optional<Entity> fetch via complete primary key.
Object value = cache.get(key);
if (value != null) {
result = (E) Optional.of(value);
// if log statements: log cache hit for entity, primary key
// metrics.cacheHit +1
} else {
// if log statements: log cache miss for entity, primary key
// metrics.cacheMiss +1
}
}
ResultSet resultSet = null;
if (result == null) {
Statement statement = delegate.options(delegate.buildStatement(prepareStatementForCaching));
// if log statements... log it here.
ResultSetFuture futureResultSet = session.executeAsync(statement, showValues);
resultSet = futureResultSet.get();
}
result = delegate.transform(resultSet);
if (cache != null) {
updateCache.apply(result, cache);
}
return (E) result;
return result;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
@ -87,4 +101,5 @@ public enum Executioner {
}
}
}
}

View file

@ -120,7 +120,7 @@ public final class InsertOperation<T> extends AbstractOperation<T, InsertOperati
}
@Override
public BuiltStatement buildStatement() {
public BuiltStatement buildStatement(boolean cached) {
values.forEach(t -> addPropertyNode(t._1));

View file

@ -1,9 +1,18 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Statement;
interface OperationsDelegate<E> {
Statement buildStatement(boolean cached);
Statement options(Statement statement);
public interface OperationsDelegate<E> {
E transform(ResultSet resultSet);
AbstractCache getCache();
CacheKey getCacheKey();
}

View file

@ -44,7 +44,7 @@ public final class PreparedStreamOperation<E> {
key = key.replaceFirst(Pattern.quote("?"), param.toString());
}
return new BoundStreamOperation<E>(boundStatement, new CacheKey(key), operation);
return new BoundStreamOperation<E>(boundStatement, operation.getCacheKey(), operation);
}
@Override

View file

@ -38,8 +38,18 @@ public final class SelectFirstOperation<E>
}
@Override
public BuiltStatement buildStatement() {
return src.buildStatement();
public AbstractCache getCache() {
return src.getCache();
}
@Override
public CacheKey getCacheKey() {
return src.getCacheKey();
}
@Override
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
}
@Override

View file

@ -36,8 +36,8 @@ public final class SelectFirstTransformingOperation<R, E>
}
@Override
public BuiltStatement buildStatement() {
return src.buildStatement();
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
}
@Override

View file

@ -27,6 +27,9 @@ import java.util.*;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import net.helenus.core.*;
import net.helenus.core.reflect.HelenusPropertyNode;
import net.helenus.mapping.HelenusEntity;
@ -166,13 +169,6 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
return this;
}
protected AbstractCache getCache() {
if (!ignoreSessionCache) {
return sessionOps.getSessionCache();
}
return null;
}
public SelectOperation<E> ignoreCache() {
ignoreSessionCache = true;
return this;
@ -189,17 +185,53 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
}
@Override
public BuiltStatement buildStatement() {
public AbstractCache getCache() {// TODO, not really public API...
if (!ignoreSessionCache) {
return sessionOps.getSessionCache();
}
return null;
}
@Override
public CacheKey getCacheKey() {
List<String>keys = new ArrayList<>(filters.size());
HelenusEntity entity = props.get(0).getEntity();
for (HelenusPropertyNode prop : props) {
switch(prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
Filter filter = filters.get(prop.getProperty());
if (filter != null) {
keys.add(filter.toString());
} else {
// we're missing a part of the primary key, so we can't create a proper cache key
return null;
}
break;
default:
// We've past the primary key components in this ordered list, so we're done building
// the cache key.
if (keys.size() > 0) {
return new CacheKey(entity, Joiner.on(",").join(keys));
}
return null;
}
}
return null;
}
@Override
public BuiltStatement buildStatement(boolean cached) {
HelenusEntity entity = null;
Selection selection = QueryBuilder.select();
// iff in txn or cacheable add ttl and timestamp to result set for each col selected
// construct set of primary keys (partition and col)
// construct cache key
for (HelenusPropertyNode prop : props) {
selection = selection.column(prop.getColumnName());
String columnName = prop.getColumnName();
selection = selection.column(columnName);
if (prop.getProperty().caseSensitiveIndex()) {
allowFiltering = true;
@ -209,10 +241,29 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
entity = prop.getEntity();
} else if (entity != prop.getEntity()) {
throw new HelenusMappingException(
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
"you can select columns only from a single entity "
+ entity.getMappingInterface()
+ " or "
+ prop.getEntity().getMappingInterface());
}
if (cached) {
switch(prop.getProperty().getColumnType()) {
case PARTITION_KEY:
case CLUSTERING_COLUMN:
break;
default:
if (entity.equals(prop.getEntity())) {
if (prop.getNext().isPresent()) {
columnName = Iterables.getLast(prop).getColumnName().toCql(true);
}
if (!prop.getProperty().getDataType().isCollectionType()) {
selection.writeTime(columnName);
selection.ttl(columnName);
}
}
break;
}
}
}
@ -234,7 +285,7 @@ public final class SelectOperation<E> extends AbstractFilterStreamOperation<E, S
Where where = select.where();
for (Filter<?> filter : filters) {
for (Filter<?> filter : filters.values()) {
where.and(filter.getClause(sessionOps.getValuePreparer()));
}
}

View file

@ -36,12 +36,12 @@ public final class SelectTransformingOperation<R, E>
}
@Override
public BuiltStatement buildStatement() {
return src.buildStatement();
public BuiltStatement buildStatement(boolean cached) {
return src.buildStatement(cached);
}
@Override
protected AbstractCache getCache() {
public AbstractCache getCache() {
return src.getCache();
}

View file

@ -6,31 +6,20 @@ import com.datastax.driver.core.Statement;
import com.google.common.cache.Cache;
import java.util.concurrent.ExecutionException;
public class SessionCache extends AbstractCache<String, ResultSet> {
public class SessionCache extends AbstractCache<CacheKey, ResultSet> {
protected ResultSet apply(
Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
protected ResultSet apply(CacheKey key, OperationsDelegate delegate)
throws InterruptedException, ExecutionException {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
resultSet = resultSetFuture.get();
} else {
resultSet = cache.getIfPresent(cacheKey);
if (resultSet == null) {
resultSet = resultSetFuture.get();
if (resultSet != null) {
cache.put(cacheKey, resultSet);
}
}
resultSet = cache.getIfPresent(key);
if (resultSet != null) {
cache.put(key, resultSet);
}
return resultSet;
}
public ResultSet get(Statement statement, OperationsDelegate delegate) {
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
return cache.getIfPresent(cacheKey);
}
}

View file

@ -1,19 +1,13 @@
package net.helenus.core.operation;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Statement;
import java.util.Optional;
import net.helenus.core.UnitOfWork;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class UnitOfWorkCache extends AbstractCache<String, ResultSet> {
public class UnitOfWorkCache extends AbstractCache<CacheKey, Object> {
private final UnitOfWork uow;
private final Map<String, ResultSet> cache = new HashMap<String, ResultSet>();
private AbstractCache sessionCache;
private AbstractCache<CacheKey, Object> sessionCache;
public UnitOfWorkCache(UnitOfWork uow, AbstractCache sessionCache) {
super();
@ -22,32 +16,22 @@ public class UnitOfWorkCache extends AbstractCache<String, ResultSet> {
}
@Override
protected ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture)
throws InterruptedException, ExecutionException {
return resultSetFuture.get();
/*
final CacheKey key = delegate.getCacheKey();
final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString();
ResultSet resultSet = null;
if (cacheKey == null) {
if (sessionCache != null) {
ResultSet rs = sessionCache.apply(statement, delegate, resultSetFuture);
if (rs != null) {
return rs;
}
}
} else {
resultSet = cache.get(cacheKey);
if (resultSet != null) {
return resultSet;
}
Object get(CacheKey key) {
Object result = null;
UnitOfWork parent = null;
do {
result = uow.getCache().get(key);
parent = uow.getEnclosingUnitOfWork();
} while(result == null && parent != null);
if (result == null) {
result = sessionCache.get(key);
}
resultSet = resultSetFuture.get();
if (resultSet != null) {
cache.put(cacheKey, resultSet);
}
return resultSet;
*/
return result;
}
@Override
void put(CacheKey key, Object result) {
cache.put(key, result);
}
}

View file

@ -507,7 +507,7 @@ public final class UpdateOperation<E> extends AbstractFilterOperation<E, UpdateO
}
@Override
public BuiltStatement buildStatement() {
public BuiltStatement buildStatement(boolean cached) {
if (entity == null) {
throw new HelenusMappingException("empty update operation");

View file

@ -0,0 +1,7 @@
package net.helenus.mapping.javatype;
public abstract class AbstractCollectionJavaType extends AbstractJavaType {
public static boolean isCollectionType() { return true; }
}

View file

@ -33,6 +33,8 @@ import net.helenus.support.HelenusMappingException;
public abstract class AbstractJavaType {
public static boolean isCollectionType() { return false; }
public abstract Class<?> getJavaClass();
public boolean isApplicable(Class<?> javaClass) {

View file

@ -35,7 +35,7 @@ import net.helenus.mapping.type.UDTListDataType;
import net.helenus.support.Either;
import net.helenus.support.HelenusMappingException;
public final class ListJavaType extends AbstractJavaType {
public final class ListJavaType extends AbstractCollectionJavaType {
@Override
public Class<?> getJavaClass() {

View file

@ -18,6 +18,7 @@ package net.helenus.mapping.javatype;
import com.datastax.driver.core.*;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.AbstractCollection;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@ -31,7 +32,7 @@ import net.helenus.mapping.type.*;
import net.helenus.support.Either;
import net.helenus.support.HelenusMappingException;
public final class MapJavaType extends AbstractJavaType {
public final class MapJavaType extends AbstractCollectionJavaType {
@Override
public Class<?> getJavaClass() {

View file

@ -35,7 +35,7 @@ import net.helenus.mapping.type.UDTSetDataType;
import net.helenus.support.Either;
import net.helenus.support.HelenusMappingException;
public final class SetJavaType extends AbstractJavaType {
public final class SetJavaType extends AbstractCollectionJavaType {
@Override
public Class<?> getJavaClass() {

View file

@ -0,0 +1,13 @@
package net.helenus.mapping.type;
import net.helenus.mapping.ColumnType;
public abstract class AbstractCollectionDataType extends AbstractDataType {
public AbstractCollectionDataType(ColumnType columnType) {
super(columnType);
}
public boolean isCollectionType() { return true; }
}

View file

@ -54,4 +54,7 @@ public abstract class AbstractDataType {
throw new HelenusMappingException(
"wrong column type " + columnType + " for UserDefinedType in columnName " + columnName);
}
public boolean isCollectionType() { return false; }
}

View file

@ -34,6 +34,7 @@ public final class DTDataType extends AbstractDataType {
private final DataType dataType;
private final Class<?> javaClass;
private final Class<?>[] typeArguments;
private final boolean isCollectionType;
public DTDataType(ColumnType columnType, DataType dataType) {
this(
@ -53,6 +54,7 @@ public final class DTDataType extends AbstractDataType {
this.dataType = dataType;
this.javaClass = javaClass;
this.typeArguments = typeArguments;
this.isCollectionType = dataType.isCollection();
}
public static DTDataType list(
@ -193,6 +195,10 @@ public final class DTDataType extends AbstractDataType {
return null;
}
public boolean isCollectionType() {
return isCollectionType;
}
@Override
public String toString() {
return dataType.toString();

View file

@ -18,12 +18,14 @@ package net.helenus.mapping.type;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.UserType;
import com.datastax.driver.core.schemabuilder.*;
import java.util.AbstractCollection;
import java.util.List;
import net.helenus.mapping.ColumnType;
import net.helenus.mapping.IdentityName;
import net.helenus.support.HelenusMappingException;
public final class UDTKeyMapDataType extends AbstractDataType {
public final class UDTKeyMapDataType extends AbstractCollectionDataType {
private final IdentityName keyType;
private final Class<?> udtKeyClass;

View file

@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType;
import net.helenus.mapping.IdentityName;
import net.helenus.support.HelenusMappingException;
public final class UDTListDataType extends AbstractDataType {
public final class UDTListDataType extends AbstractCollectionDataType {
private final IdentityName udtName;
private final Class<?> udtClass;

View file

@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType;
import net.helenus.mapping.IdentityName;
import net.helenus.support.HelenusMappingException;
public final class UDTMapDataType extends AbstractDataType {
public final class UDTMapDataType extends AbstractCollectionDataType {
private final IdentityName keyType;
private final Class<?> udtKeyClass;

View file

@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType;
import net.helenus.mapping.IdentityName;
import net.helenus.support.HelenusMappingException;
public final class UDTSetDataType extends AbstractDataType {
public final class UDTSetDataType extends AbstractCollectionDataType {
private final IdentityName udtName;
private final Class<?> udtClass;

View file

@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType;
import net.helenus.mapping.IdentityName;
import net.helenus.support.HelenusMappingException;
public final class UDTValueMapDataType extends AbstractDataType {
public final class UDTValueMapDataType extends AbstractCollectionDataType {
private final DataType keyType;
private final IdentityName valueType;

View file

@ -17,9 +17,11 @@ package net.helenus.test.integration.core.simple;
import static net.helenus.core.Query.eq;
import com.datastax.driver.core.ResultSet;
import net.helenus.core.Helenus;
import net.helenus.core.HelenusSession;
import net.helenus.core.Operator;
import net.helenus.core.operation.UpdateOperation;
import net.helenus.core.reflect.Drafted;
import net.helenus.mapping.HelenusEntity;
import net.helenus.support.Fun;
@ -220,13 +222,13 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
// INSERT
session
.update()
.set(user::name, null)
.set(user::age, null)
.set(user::type, null)
.where(user::id, eq(100L))
.zipkinContext(null)
.sync();
.update()
.set(user::name, null)
.set(user::age, null)
.set(user::type, null)
.where(user::id, eq(100L))
.zipkinContext(null)
.sync();
Fun.Tuple3<String, Integer, UserType> tuple =
session
@ -248,6 +250,25 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest {
Assert.assertEquals(0L, cnt);
}
public void testZipkin() throws Exception {
session
.update()
.set(user::name, null)
.set(user::age, null)
.set(user::type, null)
.where(user::id, eq(100L))
.zipkinContext(null)
.sync();
UpdateOperation<ResultSet> update = session.update();
update
.set(user::name, null)
.zipkinContext(null)
.sync();
}
private void assertUsers(User expected, User actual) {
Assert.assertEquals(expected.id(), actual.id());
Assert.assertEquals(expected.name(), actual.name());

View file

@ -62,14 +62,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
@Test
public void testSelectAfterInsertProperlyCachesEntity() throws Exception {
/*
Widget w1, w2, w3, w4;
UUID key = UUIDs.timeBased();
try (UnitOfWork uow = session.begin()) {
// This should cache the inserted Widget.
w1 = session.<Widget>upsert(widget)
w1 = session.<Widget>insert(widget)
.value(widget::id, key)
.value(widget::name, RandomString.make(20))
.sync(uow);
@ -106,7 +105,6 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest {
Assert.assertNotEquals(w1, w4);
Assert.assertTrue(w1.equals(w4));
*/
}
}