diff --git a/build.gradle b/build.gradle index 1c1b7de..c18e085 100644 --- a/build.gradle +++ b/build.gradle @@ -64,7 +64,7 @@ dependencies { compile group: 'org.aspectj', name: 'aspectjweaver', version: '1.8.10' compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.6' compile group: 'org.springframework', name: 'spring-core', version: '4.3.10.RELEASE' - + compile group: 'org.ahocorasick', name: 'ahocorasick', version: '0.4.0' compile group: 'com.google.guava', name: 'guava', version: '20.0' compile group: 'com.diffplug.durian', name: 'durian', version: '3.+' compile group: 'io.zipkin.java', name: 'zipkin', version: '1.29.2' diff --git a/helenus-core.iml b/helenus-core.iml index be96637..1f3247f 100644 --- a/helenus-core.iml +++ b/helenus-core.iml @@ -35,6 +35,7 @@ + diff --git a/pom.xml b/pom.xml index 71feca8..8e4391b 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,12 @@ 20.0 + + org.ahocorasick + ahocorasick + 0.4.0 + + io.zipkin.java diff --git a/src/main/java/net/helenus/core/AbstractUnitOfWork.java b/src/main/java/net/helenus/core/AbstractUnitOfWork.java index c621770..3d505dd 100644 --- a/src/main/java/net/helenus/core/AbstractUnitOfWork.java +++ b/src/main/java/net/helenus/core/AbstractUnitOfWork.java @@ -17,15 +17,22 @@ package net.helenus.core; import com.diffplug.common.base.Errors; import com.google.common.collect.TreeTraverser; +import net.helenus.core.cache.EntityIdentifyingFacet; +import net.helenus.support.Either; +import org.ahocorasick.trie.Emit; +import org.ahocorasick.trie.Trie; + import java.util.*; +import java.util.stream.Collectors; /** Encapsulates the concept of a "transaction" as a unit-of-work. */ -public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { +public abstract class AbstractUnitOfWork implements UnitOfWork, AutoCloseable { private final List> nested = new ArrayList<>(); private final HelenusSession session; private final AbstractUnitOfWork parent; private List postCommit = new ArrayList(); - private final Map> cache = new HashMap>(); + private final Map>> cache = new HashMap>>(); + private Trie cacheIndex = Trie.builder().ignoreOverlaps().onlyWholeWordsWhiteSpaceSeparated().build(); private boolean aborted = false; private boolean committed = false; @@ -36,14 +43,15 @@ public abstract class AbstractUnitOfWork implements UnitOfW this.parent = parent; } - public UnitOfWork addNestedUnitOfWork(UnitOfWork uow) { + @Override + public void addNestedUnitOfWork(UnitOfWork uow) { synchronized (nested) { nested.add((AbstractUnitOfWork) uow); } - return this; } - public UnitOfWork begin() { + @Override + public UnitOfWork begin() { // log.record(txn::start) return this; } @@ -56,20 +64,49 @@ public abstract class AbstractUnitOfWork implements UnitOfW } } - public Set cacheLookup(String key) { - Set r = getCache().get(key); - if (r != null) { - return r; - } else { - if (parent != null) { - return parent.cacheLookup(key); + @Override + public Optional>> cacheLookupByFacet(Set facets) { + Optional>> result = null; + Collection emits = cacheIndex.parseText(String.join(" ", facets.stream() + .map(facet -> facet.toString()).collect(Collectors.toList()))); + for (Emit emit : emits) { + // NOTE: rethink. should this match *all* facets? how do I know which emit keyword is the primary key? + String key = emit.getKeyword(); + result = cacheLookup(key); } - } - return null; + return result; } - public Map> getCache() { - return cache; + @Override + public Optional>> cacheLookupByStatement(String[] statementKeys) { + Optional>> result = null; + String key = String.join(",", statementKeys); + return cacheLookup(key); + } + + @Override + public Optional>> cacheLookup(String key) { + Optional>> result = Optional.of(cache.get(key)); + + if (result.isPresent()) { + return result; + } else { + // Be sure to check all enclosing UnitOfWork caches as well, we may be nested. + if (parent != null) { + return parent.cacheLookup(key); + } + } + return Optional.empty(); + } + + @Override + public void cacheUpdate(Either> value, String[] statementKeys, Set facets) { + String key = String.join(",", statementKeys); + cache.put(key, value); + Trie.TrieBuilder builder = cacheIndex.builder(); + facets.forEach(facet -> { + builder.addKeyword(facet.toString()); + }); } private Iterator> getChildNodes() { @@ -108,18 +145,7 @@ public abstract class AbstractUnitOfWork implements UnitOfW // Merge UOW cache into parent's cache. if (parent != null) { - Map> parentCache = parent.getCache(); - for (String key : cache.keySet()) { - if (parentCache.containsKey(key)) { - // merge the sets - Set ps = parentCache.get(key); - ps.addAll( - cache.get(key)); //TODO(gburd): review this, likely not correct in all cases as-is. - } else { - // add the missing set - parentCache.put(key, cache.get(key)); - } - } + parent.assumeCache(cache, cacheIndex); } // Apply all post-commit functions for @@ -155,6 +181,37 @@ public abstract class AbstractUnitOfWork implements UnitOfW // cache.invalidateSince(txn::start time) } + private void assumeCache(Map>> childCache, Trie childCacheIndex) { + for (String key : childCache.keySet()) { + if (cache.containsKey(key)) { + Either> value = cache.get(key); + if (value.isLeft()) { + Object obj = value.getLeft(); + // merge objects + Either> childValue = childCache.get(key); + if (childValue.isLeft()) { + Object childObj = childValue.getLeft(); + } else { + Set childSet = childValue.getRight(); + } + } else { + // merge the sets + Set set = value.getRight(); + Either> childValue = childCache.get(key); + if (childValue.isLeft()) { + Object childObj = childValue.getLeft(); + set.add(childObj); + } else { + Set childSet = childValue.getRight(); + set.addAll(childSet); + } + } + } else { + cache.put(key, childCache.get(key)); + } + } + } + public String describeConflicts() { return "it's complex..."; } diff --git a/src/main/java/net/helenus/core/UnitOfWork.java b/src/main/java/net/helenus/core/UnitOfWork.java index 6bc630a..da799fc 100644 --- a/src/main/java/net/helenus/core/UnitOfWork.java +++ b/src/main/java/net/helenus/core/UnitOfWork.java @@ -15,10 +15,13 @@ */ package net.helenus.core; -import java.util.Map; +import net.helenus.core.cache.EntityIdentifyingFacet; +import net.helenus.support.Either; + +import java.util.Optional; import java.util.Set; -public interface UnitOfWork extends AutoCloseable { +public interface UnitOfWork extends AutoCloseable { /** * Marks the beginning of a transactional section of work. Will write a record to the shared @@ -26,17 +29,17 @@ public interface UnitOfWork extends AutoCloseable { * * @return the handle used to commit or abort the work. */ - UnitOfWork begin(); + UnitOfWork begin(); - UnitOfWork addNestedUnitOfWork(UnitOfWork uow); + void addNestedUnitOfWork(UnitOfWork uow); /** * Checks to see if the work performed between calling begin and now can be committed or not. * * @return a function from which to chain work that only happens when commit is successful - * @throws E when the work overlaps with other concurrent writers. + * @throws X when the work overlaps with other concurrent writers. */ - PostCommitFunction commit() throws E; + PostCommitFunction commit() throws X; /** * Explicitly abort the work within this unit of work. Any nested aborted unit of work will @@ -48,8 +51,9 @@ public interface UnitOfWork extends AutoCloseable { boolean hasCommitted(); - //Either> cacheLookup(String key); - Set cacheLookup(String key); + Optional>> cacheLookup(String key); + Optional>> cacheLookupByFacet(Set facets); + Optional>> cacheLookupByStatement(String[] statementKeys); + void cacheUpdate(Either> pojo, String[] statementKeys, Set facets); - Map> getCache(); } diff --git a/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java b/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java index b4a7cf9..62b3849 100644 --- a/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java +++ b/src/main/java/net/helenus/core/cache/EntityIdentifyingFacet.java @@ -2,9 +2,27 @@ package net.helenus.core.cache; import net.helenus.mapping.HelenusProperty; +import java.util.Set; + public class EntityIdentifyingFacet extends Facet { public EntityIdentifyingFacet(HelenusProperty prop) {} public EntityIdentifyingFacet(HelenusProperty[]... props) {} + + public boolean isFullyBound() { + return false; + } + + public HelenusProperty getProperty() { + return null; + } + + public Set getUnboundEntityProperties() { + return null; + } + + public void setValueForProperty(HelenusProperty prop, Object value) { + } + } diff --git a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java index b78daf1..9a5bc8c 100644 --- a/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractFilterStreamOperation.java @@ -22,9 +22,7 @@ import java.util.Map; import net.helenus.core.*; import net.helenus.mapping.HelenusProperty; -public abstract class AbstractFilterStreamOperation< - E, O extends AbstractFilterStreamOperation> - extends AbstractStreamOperation { +public abstract class AbstractFilterStreamOperation> extends AbstractStreamOperation { protected Map> filters = null; protected List> ifFilters = null; diff --git a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java index 349469a..e4b9302 100644 --- a/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractOptionalOperation.java @@ -21,164 +21,109 @@ import com.datastax.driver.core.ResultSet; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.HashSet; -import java.util.Map; +import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; +import net.helenus.core.cache.EntityIdentifyingFacet; + import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; -import net.helenus.core.AbstractSessionOperations; -import net.helenus.core.Filter; -import net.helenus.core.Helenus; -import net.helenus.core.UnitOfWork; -import net.helenus.core.cache.EntityIdentifyingFacet; -import net.helenus.mapping.HelenusEntity; -import net.helenus.mapping.HelenusProperty; -import net.helenus.mapping.value.BeanColumnValueProvider; -import net.helenus.support.Either; - -import javax.swing.text.html.parser.Entity; - public abstract class AbstractOptionalOperation> - extends AbstractStatementOperation { + extends AbstractStatementOperation { - public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { - super(sessionOperations); - } - - public abstract Optional transform(ResultSet resultSet); - - public PreparedOptionalOperation prepare() { - return new PreparedOptionalOperation(prepareStatement(), this); - } - - public ListenableFuture> prepareAsync() { - final O _this = (O) this; - return Futures.transform( - prepareStatementAsync(), - new Function>() { - @Override - public PreparedOptionalOperation apply(PreparedStatement preparedStatement) { - return new PreparedOptionalOperation(preparedStatement, _this); - } - }); - } - - public Optional sync() throws TimeoutException { - final Timer.Context context = requestLatency.time(); - try { - ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, false); - return transform(resultSet); - } finally { - context.stop(); + public AbstractOptionalOperation(AbstractSessionOperations sessionOperations) { + super(sessionOperations); } - } - public Optional sync(UnitOfWork uow) throws TimeoutException { - if (uow == null) return sync(); + public abstract Optional transform(ResultSet resultSet); - final Timer.Context context = requestLatency.time(); - try { + public PreparedOptionalOperation prepare() { + return new PreparedOptionalOperation(prepareStatement(), this); + } - Optional result = null; - String stmtKey = null; - if (enableCache) { - Set facets = getIdentifyingFacets(); - if (!facets.isEmpty()) { - for (EntityIdentifyingFacet facet : facets) { - //TODO(gburd): what about select ResultSet, Tuple... etc.? - Optional, E>> optionalCachedResult = uow.cacheLookup(facet.hashCode()); - if (optionalCachedResult.isPresent()) { - uowCacheHits.mark(); - logger.info("UnitOfWork({}) cache hit for facet: {} with key: {}", uow.hashCode(), facet.toString(), facet.hashCode()); - Either, E> eitherCachedResult = optionalCachedResult.get(); - if (eitherCachedResult.isRight()) { - E cachedResult = eitherCachedResult.getRight(); - result = Optional.of(cachedResult); - } - break; - } - } - } else { - // The operation didn't provide enough information to uniquely identify the entity object - // using one of the facets, but that doesn't mean a filtering query won't return a proper - // result. Look in the cache to see if this statement has been executed before. - stmtKey = getStatementCacheKey(); - Optional, E>> optionalCachedResult = uow.cacheLookup(stmtKey.hashCode()); - if (optionalCachedResult.isPresent()) { - Either, E> eitherCachedResult = optionalCachedResult.get(); - if (eitherCachedResult.isLeft()) { - Set cachedResult = eitherCachedResult.getLeft(); - // Ensure that this non-indexed selection uniquely identified an Entity. - if (!(cachedResult.isEmpty() || cachedResult.size() > 1)) { - uowCacheHits.mark(); - logger.info("UnitOfWork({}) cache hit for stmt {} {}", uow.hashCode(), stmtKey, - stmtKey.hashCode()); - result = cachedResult.stream().findFirst(); - } - } - } - } - - if (result == null) { - ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); - result = transform(resultSet); - - if (enableCache && result.isPresent()) { - // If we executed a query that didn't depend on an we have a stmtKey for the filters, add that to the cache. - if (stmtKey != null) { - Set set = new HashSet(1); - set.add(result.get()); - uow.getCache().put(stmtKey.hashCode(), set); - } - // Now insert this entity into the cache for each facet for this entity that we can fully bind. - E entity = result.get(); - Map facetMap = Helenus.entity(result.get().getClass()).getIdentityFacets(); - facetMap.forEach((facetName, facet) -> { - EntityIdentifyingFacet boundFacet = null; - if (!facet.isFullyBound()) { - boundFacet = new EntityIdentifyingFacet(facet); - for (HelenusProperty prop : facet.getUnboundEntityProperties()) { - Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(entity, -1, prop); - if (value == null) { break; } - boundFacet.setValueForProperty(prop, value); + public ListenableFuture> prepareAsync() { + final O _this = (O) this; + return Futures.transform( + prepareStatementAsync(), + new Function>() { + @Override + public PreparedOptionalOperation apply(PreparedStatement preparedStatement) { + return new PreparedOptionalOperation(preparedStatement, _this); } - } - if (boundFacet != null && boundFacet.isFullyBound()) { - uow.getCache().put(boundFacet.hashCode(), Either) - } - }); - Set set = new HashSet(1); - set.add(result.get()); - uow.getCache().put(key, set); - } else { - uow.getCache().put(key, new HashSet(0)); - } - } - } - - return result; - } finally { - context.stop(); + }); } - } - public CompletableFuture> async() { - return CompletableFuture.>supplyAsync(() -> { + public Optional sync() throws TimeoutException { + final Timer.Context context = requestLatency.time(); try { - return sync(); - } catch (TimeoutException ex) { throw new CompletionException(ex); } - }); - } + ResultSet resultSet = this.execute(sessionOps, null, traceContext, queryExecutionTimeout, queryTimeoutUnits, + showValues, false); + return transform(resultSet); + } finally { + context.stop(); + } + } - public CompletableFuture> async(UnitOfWork uow) { - if (uow == null) return async(); - return CompletableFuture.>supplyAsync(() -> { - try { + public Optional sync(UnitOfWork uow) throws TimeoutException { + if (uow == null) return sync(); - } catch (TimeoutException ex) { throw new CompletionException(ex); } - }); - } + + final Timer.Context context = requestLatency.time(); + try { + + Optional result = Optional.empty(); + String[] statementKeys = null; + + if (enableCache) { + Set facets = getFacets(); + statementKeys = getQueryKeys(); + result = Optional.of(checkCache(uow, facets, statementKeys)); + } + + if (!result.isPresent()) { + // Formulate the query and execute it against the Cassandra cluster. + ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, + showValues, true); + + // Transform the query result set into the desired shape. + result = transform(resultSet); + } + + // If we have a result and we're caching then we need to put it into the cache for future requests to find. + if (enableCache && result.isPresent()) { + updateCache(uow, result.get(), statementKeys); + } + + return result; + } finally { + context.stop(); + } + } + + public CompletableFuture> async() { + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } + catch (TimeoutException ex) { + throw new CompletionException(ex); + } + }); + } + + public CompletableFuture> async(UnitOfWork uow) { + if (uow == null) + return async(); + return CompletableFuture.>supplyAsync(() -> { + try { + return sync(); + } + catch (TimeoutException ex) { + throw new CompletionException(ex); + } + }); + } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java index bba8f9b..115ee51 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStatementOperation.java @@ -28,283 +28,357 @@ import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.querybuilder.BuiltStatement; import com.google.common.util.concurrent.ListenableFuture; import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.Helenus; +import net.helenus.core.UnitOfWork; +import net.helenus.core.cache.EntityIdentifyingFacet; +import net.helenus.mapping.HelenusProperty; +import net.helenus.mapping.value.BeanColumnValueProvider; +import net.helenus.support.Either; import net.helenus.support.HelenusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public abstract class AbstractStatementOperation> - extends Operation { + extends Operation { - final Logger logger = LoggerFactory.getLogger(getClass()); - - public abstract Statement buildStatement(boolean cached); - - protected boolean enableCache = true; - protected boolean showValues = true; - protected TraceContext traceContext; - private ConsistencyLevel consistencyLevel; - private ConsistencyLevel serialConsistencyLevel; - private RetryPolicy retryPolicy; - private boolean idempotent = false; - private boolean enableTracing = false; - private long[] defaultTimestamp = null; - private int[] fetchSize = null; - long queryExecutionTimeout = 10; - TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; - - public AbstractStatementOperation(AbstractSessionOperations sessionOperations) { - super(sessionOperations); - this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel(); - this.idempotent = sessionOperations.getDefaultQueryIdempotency(); - } - - public O ignoreCache(boolean enabled) { - enableCache = enabled; - return (O) this; - } - - public O ignoreCache() { - enableCache = true; - return (O) this; - } - - public O showValues(boolean enabled) { - this.showValues = enabled; - return (O) this; - } - - public O defaultTimestamp(long timestamp) { - this.defaultTimestamp = new long[1]; - this.defaultTimestamp[0] = timestamp; - return (O) this; - } - - public O retryPolicy(RetryPolicy retryPolicy) { - this.retryPolicy = retryPolicy; - return (O) this; - } - - public O defaultRetryPolicy() { - this.retryPolicy = DefaultRetryPolicy.INSTANCE; - return (O) this; - } - - public O idempotent() { - this.idempotent = true; - return (O) this; - } - - public O isIdempotent(boolean idempotent) { - this.idempotent = idempotent; - return (O) this; - } - - public O downgradingConsistencyRetryPolicy() { - this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; - return (O) this; - } - - public O fallthroughRetryPolicy() { - this.retryPolicy = FallthroughRetryPolicy.INSTANCE; - return (O) this; - } - - public O consistency(ConsistencyLevel level) { - this.consistencyLevel = level; - return (O) this; - } - - public O consistencyAny() { - this.consistencyLevel = ConsistencyLevel.ANY; - return (O) this; - } - - public O consistencyOne() { - this.consistencyLevel = ConsistencyLevel.ONE; - return (O) this; - } - - public O consistencyQuorum() { - this.consistencyLevel = ConsistencyLevel.QUORUM; - return (O) this; - } - - public O consistencyAll() { - this.consistencyLevel = ConsistencyLevel.ALL; - return (O) this; - } - - public O consistencyLocalOne() { - this.consistencyLevel = ConsistencyLevel.LOCAL_ONE; - return (O) this; - } - - public O consistencyLocalQuorum() { - this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM; - return (O) this; - } - - public O consistencyEachQuorum() { - this.consistencyLevel = ConsistencyLevel.EACH_QUORUM; - return (O) this; - } - - public O serialConsistency(ConsistencyLevel level) { - this.serialConsistencyLevel = level; - return (O) this; - } - - public O serialConsistencyAny() { - this.serialConsistencyLevel = ConsistencyLevel.ANY; - return (O) this; - } - - public O serialConsistencyOne() { - this.serialConsistencyLevel = ConsistencyLevel.ONE; - return (O) this; - } - - public O serialConsistencyQuorum() { - this.serialConsistencyLevel = ConsistencyLevel.QUORUM; - return (O) this; - } - - public O serialConsistencyAll() { - this.serialConsistencyLevel = ConsistencyLevel.ALL; - return (O) this; - } - - public O serialConsistencyLocal() { - this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL; - return (O) this; - } - - public O serialConsistencyLocalQuorum() { - this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM; - return (O) this; - } - - public O disableTracing() { - this.enableTracing = false; - return (O) this; - } - - public O enableTracing() { - this.enableTracing = true; - return (O) this; - } - - public O tracing(boolean enable) { - this.enableTracing = enable; - return (O) this; - } - - public O fetchSize(int fetchSize) { - this.fetchSize = new int[1]; - this.fetchSize[0] = fetchSize; - return (O) this; - } - - public O queryTimeoutMs(long ms) { - this.queryExecutionTimeout = ms; - this.queryTimeoutUnits = TimeUnit.MILLISECONDS; - return (O) this; - } - - public O queryTimeout(long timeout, TimeUnit units) { - this.queryExecutionTimeout = timeout; - this.queryTimeoutUnits = units; - return (O) this; - } - - public Statement options(Statement statement) { - - if (defaultTimestamp != null) { - statement.setDefaultTimestamp(defaultTimestamp[0]); + final Logger logger = LoggerFactory.getLogger(getClass()); + protected boolean enableCache = true; + protected boolean showValues = true; + protected TraceContext traceContext; + long queryExecutionTimeout = 10; + TimeUnit queryTimeoutUnits = TimeUnit.SECONDS; + private ConsistencyLevel consistencyLevel; + private ConsistencyLevel serialConsistencyLevel; + private RetryPolicy retryPolicy; + private boolean idempotent = false; + private boolean enableTracing = false; + private long[] defaultTimestamp = null; + private int[] fetchSize = null; + public AbstractStatementOperation(AbstractSessionOperations sessionOperations) { + super(sessionOperations); + this.consistencyLevel = sessionOperations.getDefaultConsistencyLevel(); + this.idempotent = sessionOperations.getDefaultQueryIdempotency(); } - if (consistencyLevel != null) { - statement.setConsistencyLevel(consistencyLevel); + public abstract Statement buildStatement(boolean cached); + + public O ignoreCache(boolean enabled) { + enableCache = enabled; + return (O) this; } - if (serialConsistencyLevel != null) { - statement.setSerialConsistencyLevel(serialConsistencyLevel); + public O ignoreCache() { + enableCache = true; + return (O) this; } - if (retryPolicy != null) { - statement.setRetryPolicy(retryPolicy); + public O showValues(boolean enabled) { + this.showValues = enabled; + return (O) this; } - if (enableTracing) { - statement.enableTracing(); - } else { - statement.disableTracing(); + public O defaultTimestamp(long timestamp) { + this.defaultTimestamp = new long[1]; + this.defaultTimestamp[0] = timestamp; + return (O) this; } - if (fetchSize != null) { - statement.setFetchSize(fetchSize[0]); + public O retryPolicy(RetryPolicy retryPolicy) { + this.retryPolicy = retryPolicy; + return (O) this; } - if (idempotent) { - statement.setIdempotent(true); + public O defaultRetryPolicy() { + this.retryPolicy = DefaultRetryPolicy.INSTANCE; + return (O) this; } - return statement; - } - - public O zipkinContext(TraceContext traceContext) { - if (traceContext != null) { - Tracer tracer = this.sessionOps.getZipkinTracer(); - if (tracer != null) { - this.traceContext = traceContext; - } + public O idempotent() { + this.idempotent = true; + return (O) this; } - return (O) this; - } - - public Statement statement() { - return buildStatement(false); - } - - public String cql() { - Statement statement = buildStatement(false); - if (statement == null) return ""; - if (statement instanceof BuiltStatement) { - BuiltStatement buildStatement = (BuiltStatement) statement; - return buildStatement.setForceNoValues(true).getQueryString(); - } else { - return statement.toString(); - } - } - - public PreparedStatement prepareStatement() { - - Statement statement = buildStatement(true); - - if (statement instanceof RegularStatement) { - - RegularStatement regularStatement = (RegularStatement) statement; - - return sessionOps.prepare(regularStatement); + public O isIdempotent(boolean idempotent) { + this.idempotent = idempotent; + return (O) this; } - throw new HelenusException("only RegularStatements can be prepared"); - } - - public ListenableFuture prepareStatementAsync() { - - Statement statement = buildStatement(true); - - if (statement instanceof RegularStatement) { - - RegularStatement regularStatement = (RegularStatement) statement; - - return sessionOps.prepareAsync(regularStatement); + public O downgradingConsistencyRetryPolicy() { + this.retryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE; + return (O) this; + } + + public O fallthroughRetryPolicy() { + this.retryPolicy = FallthroughRetryPolicy.INSTANCE; + return (O) this; + } + + public O consistency(ConsistencyLevel level) { + this.consistencyLevel = level; + return (O) this; + } + + public O consistencyAny() { + this.consistencyLevel = ConsistencyLevel.ANY; + return (O) this; + } + + public O consistencyOne() { + this.consistencyLevel = ConsistencyLevel.ONE; + return (O) this; + } + + public O consistencyQuorum() { + this.consistencyLevel = ConsistencyLevel.QUORUM; + return (O) this; + } + + public O consistencyAll() { + this.consistencyLevel = ConsistencyLevel.ALL; + return (O) this; + } + + public O consistencyLocalOne() { + this.consistencyLevel = ConsistencyLevel.LOCAL_ONE; + return (O) this; + } + + public O consistencyLocalQuorum() { + this.consistencyLevel = ConsistencyLevel.LOCAL_QUORUM; + return (O) this; + } + + public O consistencyEachQuorum() { + this.consistencyLevel = ConsistencyLevel.EACH_QUORUM; + return (O) this; + } + + public O serialConsistency(ConsistencyLevel level) { + this.serialConsistencyLevel = level; + return (O) this; + } + + public O serialConsistencyAny() { + this.serialConsistencyLevel = ConsistencyLevel.ANY; + return (O) this; + } + + public O serialConsistencyOne() { + this.serialConsistencyLevel = ConsistencyLevel.ONE; + return (O) this; + } + + public O serialConsistencyQuorum() { + this.serialConsistencyLevel = ConsistencyLevel.QUORUM; + return (O) this; + } + + public O serialConsistencyAll() { + this.serialConsistencyLevel = ConsistencyLevel.ALL; + return (O) this; + } + + public O serialConsistencyLocal() { + this.serialConsistencyLevel = ConsistencyLevel.LOCAL_SERIAL; + return (O) this; + } + + public O serialConsistencyLocalQuorum() { + this.serialConsistencyLevel = ConsistencyLevel.LOCAL_QUORUM; + return (O) this; + } + + public O disableTracing() { + this.enableTracing = false; + return (O) this; + } + + public O enableTracing() { + this.enableTracing = true; + return (O) this; + } + + public O tracing(boolean enable) { + this.enableTracing = enable; + return (O) this; + } + + public O fetchSize(int fetchSize) { + this.fetchSize = new int[1]; + this.fetchSize[0] = fetchSize; + return (O) this; + } + + public O queryTimeoutMs(long ms) { + this.queryExecutionTimeout = ms; + this.queryTimeoutUnits = TimeUnit.MILLISECONDS; + return (O) this; + } + + public O queryTimeout(long timeout, TimeUnit units) { + this.queryExecutionTimeout = timeout; + this.queryTimeoutUnits = units; + return (O) this; + } + + public Statement options(Statement statement) { + + if (defaultTimestamp != null) { + statement.setDefaultTimestamp(defaultTimestamp[0]); + } + + if (consistencyLevel != null) { + statement.setConsistencyLevel(consistencyLevel); + } + + if (serialConsistencyLevel != null) { + statement.setSerialConsistencyLevel(serialConsistencyLevel); + } + + if (retryPolicy != null) { + statement.setRetryPolicy(retryPolicy); + } + + if (enableTracing) { + statement.enableTracing(); + } else { + statement.disableTracing(); + } + + if (fetchSize != null) { + statement.setFetchSize(fetchSize[0]); + } + + if (idempotent) { + statement.setIdempotent(true); + } + + return statement; + } + + public O zipkinContext(TraceContext traceContext) { + if (traceContext != null) { + Tracer tracer = this.sessionOps.getZipkinTracer(); + if (tracer != null) { + this.traceContext = traceContext; + } + } + + return (O) this; + } + + public Statement statement() { + return buildStatement(false); + } + + public String cql() { + Statement statement = buildStatement(false); + if (statement == null) + return ""; + if (statement instanceof BuiltStatement) { + BuiltStatement buildStatement = (BuiltStatement) statement; + return buildStatement.setForceNoValues(true).getQueryString(); + } else { + return statement.toString(); + } + } + + public PreparedStatement prepareStatement() { + + Statement statement = buildStatement(true); + + if (statement instanceof RegularStatement) { + + RegularStatement regularStatement = (RegularStatement) statement; + + return sessionOps.prepare(regularStatement); + } + + throw new HelenusException("only RegularStatements can be prepared"); + } + + public ListenableFuture prepareStatementAsync() { + + Statement statement = buildStatement(true); + + if (statement instanceof RegularStatement) { + + RegularStatement regularStatement = (RegularStatement) statement; + + return sessionOps.prepareAsync(regularStatement); + } + + throw new HelenusException("only RegularStatements can be prepared"); + } + + protected E checkCache(UnitOfWork uow, Set facets, String[] statementKeys) { + E result = null; + + if (!facets.isEmpty()) { + //TODO(gburd): what about select ResultSet, Tuple... etc.? + Optional>> optionalCachedResult = uow.cacheLookupByFacet(facets); + if (optionalCachedResult.isPresent()) { + Either> eitherCachedResult = optionalCachedResult.get(); + if (eitherCachedResult.isLeft()) { + uowCacheHits.mark(); + logger.info("UnitOfWork({}) cache hit using facets", uow.hashCode()); + result = (E) eitherCachedResult.getLeft(); + } + } + } else { + // Then check to see if this query happens to uniquely identify a single object in the + // cache. + Optional>> optionalCachedResult = uow.cacheLookupByStatement(statementKeys); + if (optionalCachedResult.isPresent()) { + Either> eitherCachedResult = optionalCachedResult.get(); + // Statements always store Set as the value in the cache. + if (eitherCachedResult.isRight()) { + Set cachedResult = eitherCachedResult.getRight(); + if (cachedResult.size() == 1) { + Optional maybeResult = cachedResult.stream().findFirst(); + if (maybeResult.isPresent()) { + uowCacheHits.mark(); + logger.info("UnitOfWork({}) cache hit for stmt", uow.hashCode()); + } else { + result = null; + } + } + } + } + } + if (result == null) { + uowCacheMiss.mark(); + logger.info("UnitOfWork({}) cache miss", uow.hashCode()); + } + return result; + } + + protected void updateCache(UnitOfWork uow, E pojo, String[] statementKeys) { + + // Insert this entity into the cache for each facet for this entity that we can fully bind. + Map facetMap = Helenus.entity(pojo.getClass()).getIdentifyingFacets(); + facetMap.forEach((facetName, facet) -> { + if (!facet.isFullyBound()) { + HelenusProperty prop = facet.getProperty(); + Object value = BeanColumnValueProvider.INSTANCE.getColumnValue(pojo, -1, prop); + facet.setValueForProperty(prop, value); + } + }); + + // Cache the value (pojo), the statement key, and the fully bound facets. + if (statementKeys != null) { + uow.cacheUpdate(Either.left(pojo), statementKeys, + facetMap.values() + .stream() + .filter(facet -> facet.isFullyBound()) + .collect(Collectors.toSet())); + } } - throw new HelenusException("only RegularStatements can be prepared"); - } } diff --git a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java index 17fcfa4..2c253ba 100644 --- a/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/AbstractStreamOperation.java @@ -21,13 +21,15 @@ import com.datastax.driver.core.ResultSet; import com.google.common.base.Function; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import net.helenus.core.AbstractSessionOperations; +import net.helenus.core.UnitOfWork; +import net.helenus.core.cache.EntityIdentifyingFacet; + import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; -import net.helenus.core.AbstractSessionOperations; -import net.helenus.core.UnitOfWork; public abstract class AbstractStreamOperation> extends AbstractStatementOperation { @@ -64,33 +66,35 @@ public abstract class AbstractStreamOperation sync(UnitOfWork uow) throws TimeoutException { - if (uow == null) return sync(); + public Stream sync(UnitOfWork uow) throws TimeoutException { + if (uow == null) + return sync(); final Timer.Context context = requestLatency.time(); try { - Stream result = null; - String key = getStatementCacheKey(); - if (enableCache && key != null) { - Set cachedResult = (Set) uow.cacheLookup(key); - if (cachedResult != null) { - //TODO(gburd): what about select ResultSet, Tuple... etc.? - uowCacheHits.mark(); - logger.info("UOW({}) cache hit, {}", uow.hashCode()); - result = cachedResult.stream(); - } else { - uowCacheMiss.mark(); - } - } + Stream result = null; + E cachedResult = null; + String[] statementKeys = null; - if (result == null) { - ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, showValues, true); - result = transform(resultSet); - - if (key != null) { - uow.getCache().put(key, (Set) result); + if (enableCache) { + Set facets = getFacets(); + statementKeys = getQueryKeys(); + cachedResult = checkCache(uow, facets, statementKeys); + if (cachedResult != null) { + result = Stream.of(cachedResult); + } + } + + if (result == null) { + ResultSet resultSet = execute(sessionOps, uow, traceContext, queryExecutionTimeout, queryTimeoutUnits, + showValues, true); + result = transform(resultSet); + } + + // If we have a result and we're caching then we need to put it into the cache for future requests to find. + if (enableCache && cachedResult != null) { + updateCache(uow, cachedResult, statementKeys); } - } return result; } finally { @@ -106,7 +110,7 @@ public abstract class AbstractStreamOperation> async(UnitOfWork uow) { + public CompletableFuture> async(UnitOfWork uow) { if (uow == null) return async(); return CompletableFuture.>supplyAsync(() -> { try { diff --git a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java index 7a3cf2c..1c7e03c 100644 --- a/src/main/java/net/helenus/core/operation/BoundStreamOperation.java +++ b/src/main/java/net/helenus/core/operation/BoundStreamOperation.java @@ -18,6 +18,9 @@ package net.helenus.core.operation; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Statement; +import net.helenus.core.cache.EntityIdentifyingFacet; + +import java.util.Set; import java.util.stream.Stream; public final class BoundStreamOperation @@ -34,10 +37,13 @@ public final class BoundStreamOperation } @Override - public String getStatementCacheKey() { - return delegate.getStatementCacheKey(); + public String[] getQueryKeys() { + return delegate.getQueryKeys(); } + @Override + public Set getFacets() { return delegate.getFacets(); } + @Override public Stream transform(ResultSet resultSet) { return delegate.transform(resultSet); diff --git a/src/main/java/net/helenus/core/operation/InsertOperation.java b/src/main/java/net/helenus/core/operation/InsertOperation.java index 61b883c..30bf94c 100644 --- a/src/main/java/net/helenus/core/operation/InsertOperation.java +++ b/src/main/java/net/helenus/core/operation/InsertOperation.java @@ -240,7 +240,7 @@ public final class InsertOperation extends AbstractOperation keys = new ArrayList<>(values.size()); values.forEach( t -> { @@ -248,13 +248,13 @@ public final class InsertOperation extends AbstractOperation extends AbstractOperation iface = entity.getMappingInterface(); if (resultType == iface) { - String key = getStatementCacheKey(); - if (key != null) { - Set set = new HashSet(1); - set.add(result); - uow.getCache().put(key, set); - } + updateCache(uow, result, getQueryKeys()); } return result; } diff --git a/src/main/java/net/helenus/core/operation/Operation.java b/src/main/java/net/helenus/core/operation/Operation.java index 4099068..4fe5efc 100644 --- a/src/main/java/net/helenus/core/operation/Operation.java +++ b/src/main/java/net/helenus/core/operation/Operation.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeoutException; import net.helenus.core.AbstractSessionOperations; import net.helenus.core.UnitOfWork; +import net.helenus.core.cache.EntityIdentifyingFacet; import net.helenus.support.HelenusException; public abstract class Operation { @@ -75,7 +76,12 @@ public abstract class Operation { return null; } - public Set getIdentifyingFacets() { + public String[] getQueryKeys() { + return null; + } + + public Set getFacets() { return null; } + } diff --git a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java index d5cf01e..523dbbd 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstOperation.java @@ -17,7 +17,10 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.querybuilder.BuiltStatement; +import net.helenus.core.cache.EntityIdentifyingFacet; + import java.util.Optional; +import java.util.Set; import java.util.function.Function; public final class SelectFirstOperation @@ -38,8 +41,8 @@ public final class SelectFirstOperation } @Override - public String getStatementCacheKey() { - return delegate.getStatementCacheKey(); + public String[] getQueryKeys() { + return delegate.getQueryKeys(); } @Override @@ -47,6 +50,9 @@ public final class SelectFirstOperation return delegate.buildStatement(cached); } + @Override + public Set getFacets() { return delegate.getFacets(); } + @Override public Optional transform(ResultSet resultSet) { return delegate.transform(resultSet).findFirst(); diff --git a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java index 8ef4f60..95f18e8 100644 --- a/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectFirstTransformingOperation.java @@ -17,7 +17,10 @@ package net.helenus.core.operation; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.querybuilder.BuiltStatement; +import net.helenus.core.cache.EntityIdentifyingFacet; + import java.util.Optional; +import java.util.Set; import java.util.function.Function; public final class SelectFirstTransformingOperation @@ -36,10 +39,13 @@ public final class SelectFirstTransformingOperation } @Override - public String getStatementCacheKey() { - return delegate.getStatementCacheKey(); + public String[] getQueryKeys() { + return delegate.getQueryKeys(); } + @Override + public Set getFacets() { return delegate.getFacets(); } + @Override public BuiltStatement buildStatement(boolean cached) { return delegate.buildStatement(cached); diff --git a/src/main/java/net/helenus/core/operation/SelectOperation.java b/src/main/java/net/helenus/core/operation/SelectOperation.java index dcb916c..73a4553 100644 --- a/src/main/java/net/helenus/core/operation/SelectOperation.java +++ b/src/main/java/net/helenus/core/operation/SelectOperation.java @@ -15,6 +15,7 @@ */ package net.helenus.core.operation; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.Ordering; @@ -22,11 +23,13 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Select.Selection; import com.datastax.driver.core.querybuilder.Select.Where; -import com.google.common.base.Joiner; + import java.util.*; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import com.google.common.collect.Iterables; import net.helenus.core.*; import net.helenus.core.cache.EntityIdentifyingFacet; import net.helenus.core.reflect.HelenusPropertyNode; @@ -194,28 +197,24 @@ public final class SelectOperation extends AbstractFilterStreamOperation getIdentityFacets() { + public Set getFacets() { HelenusEntity entity = props.get(0).getEntity(); final Set facets = new HashSet<>(filters.size()); // Check to see if this select statement has enough information to build one or // more identifying facets. entity - .getIdentityFacets() + .getIdentifyingFacets() .forEach( (facetName, facet) -> { - EntityIdentifyingFacet boundFacet = null; - if (!facet.isFullyBound()) { - boundFacet = new EntityIdentifyingFacet(facet); - for (HelenusProperty prop : facet.getUnboundEntityProperties()) { - Filter filter = filters.get(facet.getProperty()); - if (filter == null) { - break; + if (facet.isFullyBound()) { + facets.add(facet); + } else { + HelenusProperty prop = facet.getProperty(); + Filter filter = filters.get(prop); + if (filter != null) { + facet.setValueForProperty(prop, filter.toString()); + facets.add(facet); } - boundFacet.setValueForProperty(prop, filter.toString()); - } - } - if (boundFacet != null && boundFacet.isFullyBound()) { - facets.add(boundFacet); } }); return facets; @@ -245,7 +244,6 @@ public final class SelectOperation extends AbstractFilterStreamOperation extends AbstractFilterStreamOperation } @Override - public String getStatementCacheKey() { - return delegate.getStatementCacheKey(); + public String[] getQueryKeys() { + return delegate.getQueryKeys(); } + @Override + public Set getFacets() { return delegate.getFacets(); } + @Override public BuiltStatement buildStatement(boolean cached) { return delegate.buildStatement(cached); diff --git a/src/main/java/net/helenus/core/operation/UpdateOperation.java b/src/main/java/net/helenus/core/operation/UpdateOperation.java index cf84b47..f4aa3c7 100644 --- a/src/main/java/net/helenus/core/operation/UpdateOperation.java +++ b/src/main/java/net/helenus/core/operation/UpdateOperation.java @@ -584,13 +584,9 @@ public final class UpdateOperation extends AbstractFilterOperation set = new HashSet(1); - set.add(result); - uow.getCache().put(key, set); - } + updateCache(uow, result, getQueryKeys()); } return result; } diff --git a/src/main/java/net/helenus/mapping/HelenusEntity.java b/src/main/java/net/helenus/mapping/HelenusEntity.java index fa84c7b..681d8d1 100644 --- a/src/main/java/net/helenus/mapping/HelenusEntity.java +++ b/src/main/java/net/helenus/mapping/HelenusEntity.java @@ -33,5 +33,5 @@ public interface HelenusEntity { HelenusProperty getProperty(String name); - Map getIdentityFacets(); + Map getIdentifyingFacets(); } diff --git a/src/main/java/net/helenus/mapping/HelenusMappingEntity.java b/src/main/java/net/helenus/mapping/HelenusMappingEntity.java index ceb9722..feeec77 100644 --- a/src/main/java/net/helenus/mapping/HelenusMappingEntity.java +++ b/src/main/java/net/helenus/mapping/HelenusMappingEntity.java @@ -124,9 +124,6 @@ public final class HelenusMappingEntity implements HelenusEntity { if (primaryProperties != null) { primaryFacet = new EntityIdentifyingFacet( - keyspace, - table, - schemaVersion, primaryProperties.toArray(new HelenusProperty[props.size()])); allFacetsBuilder.put("*", primaryFacet); primaryProperties = null; @@ -134,7 +131,7 @@ public final class HelenusMappingEntity implements HelenusEntity { Optional optionalIndexName = prop.getIndexName(); if (optionalIndexName.isPresent()) { EntityIdentifyingFacet facet = - new EntityIdentifyingFacet(keyspace, table, schemaVersion, prop); + new EntityIdentifyingFacet(prop); ancillaryFacetsBuilder.put(prop.getPropertyName(), facet); } } @@ -174,6 +171,11 @@ public final class HelenusMappingEntity implements HelenusEntity { return props.get(name); } + @Override + public Map getIdentifyingFacets() { + return allIdentityFacets; + } + @Override public IdentityName getName() { return name;