diff --git a/pom.xml b/pom.xml index 1395c45..b3d96c8 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 net.helenus helenus-core - 2.0.26-SNAPSHOT + 2.0.27-SNAPSHOT jar helenus diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 0acc5f8..403ddd0 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -1,8 +1,10 @@ package net.helenus.core; +import com.datastax.driver.core.ResultSet; import com.diffplug.common.base.Errors; import com.google.common.collect.TreeTraverser; import net.helenus.core.operation.AbstractCache; +import net.helenus.core.operation.CacheKey; import net.helenus.core.operation.UnitOfWorkCache; import net.helenus.support.HelenusException; @@ -16,6 +18,7 @@ public final class UnitOfWork implements AutoCloseable { private final HelenusSession session; private final UnitOfWork parent; private List postCommit = new ArrayList(); + private final Map cache = new HashMap<>(); private boolean aborted = false; private boolean committed = false; @@ -56,6 +59,10 @@ public final class UnitOfWork implements AutoCloseable { } } + public UnitOfWork getEnclosingUnitOfWork() { return parent; } + + public Map getCache() { return cache; } + private Iterator getChildNodes() { return nested.iterator(); } @@ -85,6 +92,8 @@ public final class UnitOfWork implements AutoCloseable { committed = true; aborted = false; + // TODO(gburd): union this cache with parent's (if there is a parent) or with the session cache for all cacheable entities we currently hold + nested.forEach((uow) -> Errors.rethrow().wrap(uow::commit)); // Apply all post-commit functions for diff --git a/src/main/java/net/helenus/core/operation/AbstractCache.java b/src/main/java/net/helenus/core/operation/AbstractCache.java index 5ad2f5f..b546c4e 100644 --- a/src/main/java/net/helenus/core/operation/AbstractCache.java +++ b/src/main/java/net/helenus/core/operation/AbstractCache.java @@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; public abstract class AbstractCache { final Logger logger = LoggerFactory.getLogger(getClass()); - protected Cache cache; + public Cache cache; public AbstractCache() { RemovalListener listener = @@ -38,8 +38,11 @@ public abstract class AbstractCache { .build(); } - protected abstract ResultSet apply( - Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException; + V get(K key) { + return cache.getIfPresent(key); + } + void put(K key, V value) { + cache.put(key, value); + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractFilterOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractFilterOptionalOperation.java index 4020f55..98847c9 100644 --- a/src/main/java/net/helenus/core/operation/AbstractFilterOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractFilterOptionalOperation.java @@ -15,15 +15,19 @@ */ package net.helenus.core.operation; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + import net.helenus.core.*; +import net.helenus.mapping.HelenusProperty; public abstract class AbstractFilterOptionalOperation< E, O extends AbstractFilterOptionalOperation> extends AbstractOptionalOperation { - protected List> filters = null; + protected Map> filters = null; protected List> ifFilters = null; public AbstractFilterOptionalOperation(AbstractSessionOperations sessionOperations) { @@ -95,9 +99,9 @@ public abstract class AbstractFilterOptionalOperation< private void addFilter(Filter filter) { if (filters == null) { - filters = new LinkedList>(); + filters = new LinkedHashMap>(); } - filters.add(filter); + filters.put(filter.getNode().getProperty(), filter); } private void addIfFilter(Filter filter) { diff --git a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java index b749484..e15808a 100644 --- a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java @@ -15,15 +15,19 @@ */ package net.helenus.core.operation; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; + import net.helenus.core.*; +import net.helenus.mapping.HelenusProperty; public abstract class AbstractFilterStreamOperation< E, O extends AbstractFilterStreamOperation> extends AbstractStreamOperation { - protected List> filters = null; + protected Map> filters = null; protected List> ifFilters = null; public AbstractFilterStreamOperation(AbstractSessionOperations sessionOperations) { @@ -95,9 +99,9 @@ public abstract class AbstractFilterStreamOperation< private void addFilter(Filter filter) { if (filters == null) { - filters = new LinkedList>(); + filters = new LinkedHashMap>(); } - filters.add(filter); + filters.put(filter.getNode().getProperty(), filter); } private void addIfFilter(Filter filter) { diff --git a/src/main/java/net/helenus/core/operation/AbstractOperation.java b/src/main/java/net/helenus/core/operation/AbstractOperation.java index 9b091cb..4ecd05f 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOperation.java @@ -25,7 +25,7 @@ public abstract class AbstractOperation> public abstract E transform(ResultSet resultSet); - protected AbstractCache getCache() { + public AbstractCache getCache() { return null; } @@ -46,21 +46,19 @@ public abstract class AbstractOperation> } public E sync() { - return Executioner.INSTANCE.sync( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.sync(sessionOps, null, traceContext, this, showValues); } public E sync(UnitOfWork uow) { - return Executioner.INSTANCE.sync( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.sync(sessionOps, uow, traceContext, this, showValues); } public CompletableFuture async() { - return Executioner.INSTANCE.async( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + AbstractCache cache = getCache(); + boolean cacheResult = cache != null; + return Executioner.INSTANCE.async(sessionOps, null, traceContext, this, showValues); } public CompletableFuture async(UnitOfWork uow) { - return Executioner.INSTANCE.async( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.async(sessionOps, uow, traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 5e19cd7..860fb00 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -28,13 +28,22 @@ import net.helenus.core.UnitOfWork; public abstract class AbstractOptionalOperation> extends AbstractStatementOperation implements OperationsDelegate> { + private Function, E> extractor = new Function, E>() { + + @Override + public E apply(Optional e) { + return e.orElse(null); + } + }; + + public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { super(sessionOperations); } public abstract Optional transform(ResultSet resultSet); - protected AbstractCache getCache() { return null; } + public AbstractCache getCache() { return null; } public CacheKey getCacheKey() { return null; @@ -57,23 +66,19 @@ public abstract class AbstractOptionalOperation sync() { - return Executioner.INSTANCE.>sync( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>sync(sessionOps, null, extractor, traceContext, this, showValues); } public Optional sync(UnitOfWork uow) { - return Executioner.INSTANCE.>sync( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>sync(sessionOps, uow, extractor, traceContext, this, showValues); } public CompletableFuture> async() { - return Executioner.INSTANCE.>async( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>async(sessionOps, null, extractor, traceContext, this, showValues); } public CompletableFuture> async(UnitOfWork uow) { - return Executioner.INSTANCE.>async( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>async(sessionOps, uow, extractor, traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index 5658683..04e31ee 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -38,7 +38,7 @@ public abstract class AbstractStatementOperation prepareStatementAsync() { - Statement statement = buildStatement(); + Statement statement = buildStatement(true); if (statement instanceof RegularStatement) { diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 8a3e81a..47df66b 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -34,7 +34,7 @@ public abstract class AbstractStreamOperation transform(ResultSet resultSet); - protected AbstractCache getCache() { + public AbstractCache getCache() { return null; } @@ -59,22 +59,18 @@ public abstract class AbstractStreamOperation sync() { - return Executioner.INSTANCE.>sync( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>sync(sessionOps, null, traceContext, this, showValues); } public Stream sync(UnitOfWork uow) { - return Executioner.INSTANCE.>sync( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>sync(sessionOps, uow, traceContext, this, showValues); } public CompletableFuture> async() { - return Executioner.INSTANCE.>async( - sessionOps, null, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>async(sessionOps, null, traceContext, this, showValues); } public CompletableFuture> async(UnitOfWork uow) { - return Executioner.INSTANCE.>async( - sessionOps, uow, options(buildStatement()), getCache(), traceContext, this, showValues); + return Executioner.INSTANCE.>async(sessionOps, uow, traceContext, this, showValues); } } diff --git a/src/main/java/net/helenus/core/operation/BoundOperation.java b/src/main/java/net/helenus/core/operation/BoundOperation.java index 84d4f38..28c134f 100644 --- a/src/main/java/net/helenus/core/operation/BoundOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundOperation.java @@ -36,7 +36,7 @@ public final class BoundOperation extends AbstractOperation } @Override - protected AbstractCache getCache() { + public AbstractCache getCache() { return delegate.getCache(); } @@ -49,7 +49,7 @@ public final class BoundOptionalOperation } @Override - public Statement buildStatement() { + public Statement buildStatement(boolean cached) { return boundStatement; } } diff --git a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java index 2cbf534..8923877 100644 --- a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java @@ -36,7 +36,7 @@ public final class BoundStreamOperation } @Override - protected AbstractCache getCache() { + public AbstractCache getCache() { return delegate.getCache(); } @@ -51,7 +51,7 @@ public final class BoundStreamOperation } @Override - public Statement buildStatement() { + public Statement buildStatement(boolean cached) { return boundStatement; } } diff --git a/src/main/java/net/helenus/core/operation/CacheKey.java b/src/main/java/net/helenus/core/operation/CacheKey.java index 43545d1..74e963e 100644 --- a/src/main/java/net/helenus/core/operation/CacheKey.java +++ b/src/main/java/net/helenus/core/operation/CacheKey.java @@ -1,20 +1,19 @@ package net.helenus.core.operation; import com.datastax.driver.core.Statement; +import net.helenus.mapping.HelenusEntity; import java.io.Serializable; public class CacheKey implements Serializable { private String key; - - static String of(Statement statement) { - return "use " + statement.getKeyspace() + "; " + statement.toString(); - } + private HelenusEntity entity; CacheKey() {} - CacheKey(String key) { + CacheKey(HelenusEntity entity, String key) { + this.entity = entity; this.key = key; } @@ -23,7 +22,7 @@ public class CacheKey implements Serializable { } public String toString() { - return key; + return entity.getName() + "." + key; } @Override diff --git a/src/main/java/net/helenus/core/operation/CountOperation.java b/src/main/java/net/helenus/core/operation/CountOperation.java index cdd8eef..b751cfb 100644 --- a/src/main/java/net/helenus/core/operation/CountOperation.java +++ b/src/main/java/net/helenus/core/operation/CountOperation.java @@ -40,7 +40,7 @@ public final class CountOperation extends AbstractFilterOperation addPropertyNode(f.getNode())); diff --git a/src/main/java/net/helenus/core/operation/DeleteOperation.java b/src/main/java/net/helenus/core/operation/DeleteOperation.java index 8c62692..7b6dffb 100644 --- a/src/main/java/net/helenus/core/operation/DeleteOperation.java +++ b/src/main/java/net/helenus/core/operation/DeleteOperation.java @@ -46,7 +46,7 @@ public final class DeleteOperation extends AbstractFilterOperation addPropertyNode(f.getNode())); diff --git a/src/main/java/net/helenus/core/operation/Executioner.java b/src/main/java/net/helenus/core/operation/Executioner.java index 7a2e717..4eea941 100644 --- a/src/main/java/net/helenus/core/operation/Executioner.java +++ b/src/main/java/net/helenus/core/operation/Executioner.java @@ -6,79 +6,93 @@ import brave.propagation.TraceContext; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Statement; + +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Function; + import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; + public enum Executioner { INSTANCE; - E sync( - AbstractSessionOperations session, - UnitOfWork uow, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - return this.execute( - futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues); + E sync(AbstractSessionOperations session, UnitOfWork uow, Function extractor, + TraceContext traceContext, OperationsDelegate delegate, boolean showValues) { + return this.execute(session, uow, traceContext, delegate, showValues); } - public CompletableFuture async( - AbstractSessionOperations session, - UnitOfWork uow, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { - - ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); - return CompletableFuture.supplyAsync( - () -> - execute( - futureResultSet, session, uow, statement, cache, traceContext, delegate, showValues)); + public CompletableFuture async(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, + OperationsDelegate delegate, boolean showValues) { + return CompletableFuture.supplyAsync(() -> execute(session, uow, traceContext, delegate, showValues)); } - public E execute( - ResultSetFuture futureResultSet, - AbstractSessionOperations session, - UnitOfWork uow, - Statement statement, - AbstractCache cache, - TraceContext traceContext, - OperationsDelegate delegate, - boolean showValues) { + public E execute(AbstractSessionOperations session, UnitOfWork uow, TraceContext traceContext, + OperationsDelegate delegate, boolean showValues) { + // Start recording in a Zipkin sub-span our execution time to perform this operation. Tracer tracer = session.getZipkinTracer(); Span span = null; if (tracer != null && traceContext != null) { span = tracer.newChild(traceContext); } - if (uow != null) { - cache = uow.getCacheEnclosing(cache); - } - try { if (span != null) { span.name("cassandra"); span.start(); } - ResultSet resultSet; - if (cache != null) { - resultSet = cache.apply(statement, delegate, futureResultSet); + // Determine if we are caching and if so where to put the results. + AbstractCache cache = null; + boolean prepareStatementForCaching = false; + if (uow != null ) { + prepareStatementForCaching = true; + cache = uow.getCacheEnclosing(delegate.getCache()); } else { - resultSet = futureResultSet.get(); + cache = delegate.getCache(); + prepareStatementForCaching = cache != null; } - E result = delegate.transform(resultSet); + // TODO: first, ask the delegate for the cacheKey + // if this is a SELECT query: + // if not in cache build the statement, execute the future, cache the result, transform the result then cache the transformations + // if INSERT/UPSERT/UPDATE + // if DELETE + // if COUNT + CacheKey key = (cache == null) ? null : delegate.getCacheKey(); + E result = null; + + if (key != null && cache != null) { + // Right now we only support Optional fetch via complete primary key. + Object value = cache.get(key); + if (value != null) { + result = (E) Optional.of(value); + // if log statements: log cache hit for entity, primary key + // metrics.cacheHit +1 + } else { + // if log statements: log cache miss for entity, primary key + // metrics.cacheMiss +1 + } + } + + ResultSet resultSet = null; + if (result == null) { + Statement statement = delegate.options(delegate.buildStatement(prepareStatementForCaching)); + // if log statements... log it here. + ResultSetFuture futureResultSet = session.executeAsync(statement, showValues); + resultSet = futureResultSet.get(); + } + result = delegate.transform(resultSet); + + if (cache != null) { + updateCache.apply(result, cache); + } + + return (E) result; - return result; } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } finally { @@ -87,4 +101,5 @@ public enum Executioner { } } } + } diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 17fbef9..477e8bb 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -120,7 +120,7 @@ public final class InsertOperation extends AbstractOperation addPropertyNode(t._1)); diff --git a/src/main/java/net/helenus/core/operation/OperationsDelegate.java b/src/main/java/net/helenus/core/operation/OperationsDelegate.java index eb3a7f2..754d88b 100644 --- a/src/main/java/net/helenus/core/operation/OperationsDelegate.java +++ b/src/main/java/net/helenus/core/operation/OperationsDelegate.java @@ -1,9 +1,18 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Statement; + + +interface OperationsDelegate { + + Statement buildStatement(boolean cached); + + Statement options(Statement statement); -public interface OperationsDelegate { E transform(ResultSet resultSet); + AbstractCache getCache(); + CacheKey getCacheKey(); } diff --git a/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java b/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java index d070c72..5616ae7 100644 --- a/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/PreparedStreamOperation.java @@ -44,7 +44,7 @@ public final class PreparedStreamOperation { key = key.replaceFirst(Pattern.quote("?"), param.toString()); } - return new BoundStreamOperation(boundStatement, new CacheKey(key), operation); + return new BoundStreamOperation(boundStatement, operation.getCacheKey(), operation); } @Override diff --git a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java index 3716a34..3805556 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java @@ -38,8 +38,18 @@ public final class SelectFirstOperation } @Override - public BuiltStatement buildStatement() { - return src.buildStatement(); + public AbstractCache getCache() { + return src.getCache(); + } + + @Override + public CacheKey getCacheKey() { + return src.getCacheKey(); + } + + @Override + public BuiltStatement buildStatement(boolean cached) { + return src.buildStatement(cached); } @Override diff --git a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java index 87e4333..c62db99 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java @@ -36,8 +36,8 @@ public final class SelectFirstTransformingOperation } @Override - public BuiltStatement buildStatement() { - return src.buildStatement(); + public BuiltStatement buildStatement(boolean cached) { + return src.buildStatement(cached); } @Override diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index a1a16a7..a65fe8e 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -27,6 +27,9 @@ import java.util.*; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; import net.helenus.core.*; import net.helenus.core.reflect.HelenusPropertyNode; import net.helenus.mapping.HelenusEntity; @@ -166,13 +169,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation ignoreCache() { ignoreSessionCache = true; return this; @@ -189,17 +185,53 @@ public final class SelectOperation extends AbstractFilterStreamOperationkeys = new ArrayList<>(filters.size()); + HelenusEntity entity = props.get(0).getEntity(); + + for (HelenusPropertyNode prop : props) { + switch(prop.getProperty().getColumnType()) { + case PARTITION_KEY: + case CLUSTERING_COLUMN: + + Filter filter = filters.get(prop.getProperty()); + if (filter != null) { + keys.add(filter.toString()); + } else { + // we're missing a part of the primary key, so we can't create a proper cache key + return null; + } + break; + default: + // We've past the primary key components in this ordered list, so we're done building + // the cache key. + if (keys.size() > 0) { + return new CacheKey(entity, Joiner.on(",").join(keys)); + } + return null; + } + } + return null; + } + + @Override + public BuiltStatement buildStatement(boolean cached) { HelenusEntity entity = null; Selection selection = QueryBuilder.select(); - // iff in txn or cacheable add ttl and timestamp to result set for each col selected - // construct set of primary keys (partition and col) - // construct cache key - for (HelenusPropertyNode prop : props) { - selection = selection.column(prop.getColumnName()); + String columnName = prop.getColumnName(); + selection = selection.column(columnName); if (prop.getProperty().caseSensitiveIndex()) { allowFiltering = true; @@ -209,10 +241,29 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation filter : filters) { + for (Filter filter : filters.values()) { where.and(filter.getClause(sessionOps.getValuePreparer())); } } diff --git a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java index 4f19e7a..a0d0b2e 100644 --- a/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectTransformingOperation.java @@ -36,12 +36,12 @@ public final class SelectTransformingOperation } @Override - public BuiltStatement buildStatement() { - return src.buildStatement(); + public BuiltStatement buildStatement(boolean cached) { + return src.buildStatement(cached); } @Override - protected AbstractCache getCache() { + public AbstractCache getCache() { return src.getCache(); } diff --git a/src/main/java/net/helenus/core/operation/SessionCache.java b/src/main/java/net/helenus/core/operation/SessionCache.java index 9c3734d..2c1ec22 100644 --- a/src/main/java/net/helenus/core/operation/SessionCache.java +++ b/src/main/java/net/helenus/core/operation/SessionCache.java @@ -6,31 +6,20 @@ import com.datastax.driver.core.Statement; import com.google.common.cache.Cache; import java.util.concurrent.ExecutionException; -public class SessionCache extends AbstractCache { +public class SessionCache extends AbstractCache { - protected ResultSet apply( - Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) + protected ResultSet apply(CacheKey key, OperationsDelegate delegate) throws InterruptedException, ExecutionException { - final CacheKey key = delegate.getCacheKey(); - final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); + + ResultSet resultSet = null; - if (cacheKey == null) { - resultSet = resultSetFuture.get(); - } else { - resultSet = cache.getIfPresent(cacheKey); - if (resultSet == null) { - resultSet = resultSetFuture.get(); - if (resultSet != null) { - cache.put(cacheKey, resultSet); - } - } + resultSet = cache.getIfPresent(key); + + if (resultSet != null) { + cache.put(key, resultSet); } + return resultSet; } - public ResultSet get(Statement statement, OperationsDelegate delegate) { - final CacheKey key = delegate.getCacheKey(); - final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); - return cache.getIfPresent(cacheKey); - } } diff --git a/src/main/java/net/helenus/core/operation/UnitOfWorkCache.java b/src/main/java/net/helenus/core/operation/UnitOfWorkCache.java index 5e2e344..828f14b 100644 --- a/src/main/java/net/helenus/core/operation/UnitOfWorkCache.java +++ b/src/main/java/net/helenus/core/operation/UnitOfWorkCache.java @@ -1,19 +1,13 @@ package net.helenus.core.operation; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Statement; +import java.util.Optional; + import net.helenus.core.UnitOfWork; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -public class UnitOfWorkCache extends AbstractCache { +public class UnitOfWorkCache extends AbstractCache { private final UnitOfWork uow; - private final Map cache = new HashMap(); - private AbstractCache sessionCache; + private AbstractCache sessionCache; public UnitOfWorkCache(UnitOfWork uow, AbstractCache sessionCache) { super(); @@ -22,32 +16,22 @@ public class UnitOfWorkCache extends AbstractCache { } @Override - protected ResultSet apply(Statement statement, OperationsDelegate delegate, ResultSetFuture resultSetFuture) - throws InterruptedException, ExecutionException { - return resultSetFuture.get(); - /* - final CacheKey key = delegate.getCacheKey(); - final String cacheKey = (key == null) ? CacheKey.of(statement) : key.toString(); - ResultSet resultSet = null; - if (cacheKey == null) { - if (sessionCache != null) { - ResultSet rs = sessionCache.apply(statement, delegate, resultSetFuture); - if (rs != null) { - return rs; - } - } - } else { - resultSet = cache.get(cacheKey); - if (resultSet != null) { - return resultSet; - } + Object get(CacheKey key) { + Object result = null; + UnitOfWork parent = null; + do { + result = uow.getCache().get(key); + parent = uow.getEnclosingUnitOfWork(); + } while(result == null && parent != null); + if (result == null) { + result = sessionCache.get(key); } - resultSet = resultSetFuture.get(); - if (resultSet != null) { - cache.put(cacheKey, resultSet); - } - return resultSet; - */ + return result; + } + + @Override + void put(CacheKey key, Object result) { + cache.put(key, result); } } diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index 701ca2b..40a019b 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -507,7 +507,7 @@ public final class UpdateOperation extends AbstractFilterOperation getJavaClass(); public boolean isApplicable(Class javaClass) { diff --git a/src/main/java/net/helenus/mapping/javatype/ListJavaType.java b/src/main/java/net/helenus/mapping/javatype/ListJavaType.java index b15e0fa..8e1f937 100644 --- a/src/main/java/net/helenus/mapping/javatype/ListJavaType.java +++ b/src/main/java/net/helenus/mapping/javatype/ListJavaType.java @@ -35,7 +35,7 @@ import net.helenus.mapping.type.UDTListDataType; import net.helenus.support.Either; import net.helenus.support.HelenusMappingException; -public final class ListJavaType extends AbstractJavaType { +public final class ListJavaType extends AbstractCollectionJavaType { @Override public Class getJavaClass() { diff --git a/src/main/java/net/helenus/mapping/javatype/MapJavaType.java b/src/main/java/net/helenus/mapping/javatype/MapJavaType.java index 8952419..4d6e38f 100644 --- a/src/main/java/net/helenus/mapping/javatype/MapJavaType.java +++ b/src/main/java/net/helenus/mapping/javatype/MapJavaType.java @@ -18,6 +18,7 @@ package net.helenus.mapping.javatype; import com.datastax.driver.core.*; import java.lang.reflect.Method; import java.lang.reflect.Type; +import java.util.AbstractCollection; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -31,7 +32,7 @@ import net.helenus.mapping.type.*; import net.helenus.support.Either; import net.helenus.support.HelenusMappingException; -public final class MapJavaType extends AbstractJavaType { +public final class MapJavaType extends AbstractCollectionJavaType { @Override public Class getJavaClass() { diff --git a/src/main/java/net/helenus/mapping/javatype/SetJavaType.java b/src/main/java/net/helenus/mapping/javatype/SetJavaType.java index 6cdd5c1..818d862 100644 --- a/src/main/java/net/helenus/mapping/javatype/SetJavaType.java +++ b/src/main/java/net/helenus/mapping/javatype/SetJavaType.java @@ -35,7 +35,7 @@ import net.helenus.mapping.type.UDTSetDataType; import net.helenus.support.Either; import net.helenus.support.HelenusMappingException; -public final class SetJavaType extends AbstractJavaType { +public final class SetJavaType extends AbstractCollectionJavaType { @Override public Class getJavaClass() { diff --git a/src/main/java/net/helenus/mapping/type/AbstractCollectionDataType.java b/src/main/java/net/helenus/mapping/type/AbstractCollectionDataType.java new file mode 100644 index 0000000..d138263 --- /dev/null +++ b/src/main/java/net/helenus/mapping/type/AbstractCollectionDataType.java @@ -0,0 +1,13 @@ +package net.helenus.mapping.type; + +import net.helenus.mapping.ColumnType; + +public abstract class AbstractCollectionDataType extends AbstractDataType { + + public AbstractCollectionDataType(ColumnType columnType) { + super(columnType); + } + + public boolean isCollectionType() { return true; } + +} diff --git a/src/main/java/net/helenus/mapping/type/AbstractDataType.java b/src/main/java/net/helenus/mapping/type/AbstractDataType.java index 698f5b6..8be0fd1 100644 --- a/src/main/java/net/helenus/mapping/type/AbstractDataType.java +++ b/src/main/java/net/helenus/mapping/type/AbstractDataType.java @@ -54,4 +54,7 @@ public abstract class AbstractDataType { throw new HelenusMappingException( "wrong column type " + columnType + " for UserDefinedType in columnName " + columnName); } + + public boolean isCollectionType() { return false; } + } diff --git a/src/main/java/net/helenus/mapping/type/DTDataType.java b/src/main/java/net/helenus/mapping/type/DTDataType.java index bb18480..e61d171 100644 --- a/src/main/java/net/helenus/mapping/type/DTDataType.java +++ b/src/main/java/net/helenus/mapping/type/DTDataType.java @@ -34,6 +34,7 @@ public final class DTDataType extends AbstractDataType { private final DataType dataType; private final Class javaClass; private final Class[] typeArguments; + private final boolean isCollectionType; public DTDataType(ColumnType columnType, DataType dataType) { this( @@ -53,6 +54,7 @@ public final class DTDataType extends AbstractDataType { this.dataType = dataType; this.javaClass = javaClass; this.typeArguments = typeArguments; + this.isCollectionType = dataType.isCollection(); } public static DTDataType list( @@ -193,6 +195,10 @@ public final class DTDataType extends AbstractDataType { return null; } + public boolean isCollectionType() { + return isCollectionType; + } + @Override public String toString() { return dataType.toString(); diff --git a/src/main/java/net/helenus/mapping/type/UDTKeyMapDataType.java b/src/main/java/net/helenus/mapping/type/UDTKeyMapDataType.java index b36fde2..b9724aa 100644 --- a/src/main/java/net/helenus/mapping/type/UDTKeyMapDataType.java +++ b/src/main/java/net/helenus/mapping/type/UDTKeyMapDataType.java @@ -18,12 +18,14 @@ package net.helenus.mapping.type; import com.datastax.driver.core.DataType; import com.datastax.driver.core.UserType; import com.datastax.driver.core.schemabuilder.*; + +import java.util.AbstractCollection; import java.util.List; import net.helenus.mapping.ColumnType; import net.helenus.mapping.IdentityName; import net.helenus.support.HelenusMappingException; -public final class UDTKeyMapDataType extends AbstractDataType { +public final class UDTKeyMapDataType extends AbstractCollectionDataType { private final IdentityName keyType; private final Class udtKeyClass; diff --git a/src/main/java/net/helenus/mapping/type/UDTListDataType.java b/src/main/java/net/helenus/mapping/type/UDTListDataType.java index 09f2bb9..21b2956 100644 --- a/src/main/java/net/helenus/mapping/type/UDTListDataType.java +++ b/src/main/java/net/helenus/mapping/type/UDTListDataType.java @@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType; import net.helenus.mapping.IdentityName; import net.helenus.support.HelenusMappingException; -public final class UDTListDataType extends AbstractDataType { +public final class UDTListDataType extends AbstractCollectionDataType { private final IdentityName udtName; private final Class udtClass; diff --git a/src/main/java/net/helenus/mapping/type/UDTMapDataType.java b/src/main/java/net/helenus/mapping/type/UDTMapDataType.java index bdef905..d672fa9 100644 --- a/src/main/java/net/helenus/mapping/type/UDTMapDataType.java +++ b/src/main/java/net/helenus/mapping/type/UDTMapDataType.java @@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType; import net.helenus.mapping.IdentityName; import net.helenus.support.HelenusMappingException; -public final class UDTMapDataType extends AbstractDataType { +public final class UDTMapDataType extends AbstractCollectionDataType { private final IdentityName keyType; private final Class udtKeyClass; diff --git a/src/main/java/net/helenus/mapping/type/UDTSetDataType.java b/src/main/java/net/helenus/mapping/type/UDTSetDataType.java index f9bac92..72ffcb0 100644 --- a/src/main/java/net/helenus/mapping/type/UDTSetDataType.java +++ b/src/main/java/net/helenus/mapping/type/UDTSetDataType.java @@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType; import net.helenus.mapping.IdentityName; import net.helenus.support.HelenusMappingException; -public final class UDTSetDataType extends AbstractDataType { +public final class UDTSetDataType extends AbstractCollectionDataType { private final IdentityName udtName; private final Class udtClass; diff --git a/src/main/java/net/helenus/mapping/type/UDTValueMapDataType.java b/src/main/java/net/helenus/mapping/type/UDTValueMapDataType.java index 1208a3e..48683c2 100644 --- a/src/main/java/net/helenus/mapping/type/UDTValueMapDataType.java +++ b/src/main/java/net/helenus/mapping/type/UDTValueMapDataType.java @@ -23,7 +23,7 @@ import net.helenus.mapping.ColumnType; import net.helenus.mapping.IdentityName; import net.helenus.support.HelenusMappingException; -public final class UDTValueMapDataType extends AbstractDataType { +public final class UDTValueMapDataType extends AbstractCollectionDataType { private final DataType keyType; private final IdentityName valueType; diff --git a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java index 82c48b4..9773ad1 100644 --- a/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java +++ b/src/test/java/net/helenus/test/integration/core/simple/SimpleUserTest.java @@ -17,9 +17,11 @@ package net.helenus.test.integration.core.simple; import static net.helenus.core.Query.eq; +import com.datastax.driver.core.ResultSet; import net.helenus.core.Helenus; import net.helenus.core.HelenusSession; import net.helenus.core.Operator; +import net.helenus.core.operation.UpdateOperation; import net.helenus.core.reflect.Drafted; import net.helenus.mapping.HelenusEntity; import net.helenus.support.Fun; @@ -220,13 +222,13 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { // INSERT session - .update() - .set(user::name, null) - .set(user::age, null) - .set(user::type, null) - .where(user::id, eq(100L)) - .zipkinContext(null) - .sync(); + .update() + .set(user::name, null) + .set(user::age, null) + .set(user::type, null) + .where(user::id, eq(100L)) + .zipkinContext(null) + .sync(); Fun.Tuple3 tuple = session @@ -248,6 +250,25 @@ public class SimpleUserTest extends AbstractEmbeddedCassandraTest { Assert.assertEquals(0L, cnt); } + public void testZipkin() throws Exception { + session + .update() + .set(user::name, null) + .set(user::age, null) + .set(user::type, null) + .where(user::id, eq(100L)) + .zipkinContext(null) + .sync(); + + + UpdateOperation update = session.update(); + update + .set(user::name, null) + .zipkinContext(null) + .sync(); + + } + private void assertUsers(User expected, User actual) { Assert.assertEquals(expected.id(), actual.id()); Assert.assertEquals(expected.name(), actual.name()); diff --git a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java index 99c1547..9137ea1 100644 --- a/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java +++ b/src/test/java/net/helenus/test/integration/core/unitofwork/UnitOfWorkTest.java @@ -62,14 +62,13 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { @Test public void testSelectAfterInsertProperlyCachesEntity() throws Exception { - /* Widget w1, w2, w3, w4; UUID key = UUIDs.timeBased(); try (UnitOfWork uow = session.begin()) { // This should cache the inserted Widget. - w1 = session.upsert(widget) + w1 = session.insert(widget) .value(widget::id, key) .value(widget::name, RandomString.make(20)) .sync(uow); @@ -106,7 +105,6 @@ public class UnitOfWorkTest extends AbstractEmbeddedCassandraTest { Assert.assertNotEquals(w1, w4); Assert.assertTrue(w1.equals(w4)); - */ } }